You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/11/04 13:29:15 UTC
[12/13] activemq-artemis git commit: Remove JMS prefixes
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
index 456bb58..f10962e 100644
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
+++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/impl/JMSServerManagerImpl.java
@@ -28,7 +28,6 @@ import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -57,12 +56,9 @@ import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.PostQueueCreationCallback;
-import org.apache.activemq.artemis.core.server.PostQueueDeletionCallback;
import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.QueueCreator;
-import org.apache.activemq.artemis.core.server.QueueDeleter;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.reload.ReloadCallback;
import org.apache.activemq.artemis.core.server.reload.ReloadManager;
@@ -389,15 +385,15 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
return;
}
- server.setJMSQueueCreator(new JMSDestinationCreator());
-
- server.setJMSQueueDeleter(new JMSQueueDeleter());
+// server.setJMSQueueCreator(new JMSDestinationCreator());
+//
+// server.setJMSQueueDeleter(new JMSQueueDeleter());
server.registerActivateCallback(this);
- server.registerPostQueueCreationCallback(new JMSPostQueueCreationCallback());
-
- server.registerPostQueueDeletionCallback(new JMSPostQueueDeletionCallback());
+// server.registerPostQueueCreationCallback(new JMSPostQueueCreationCallback());
+//
+// server.registerPostQueueDeletionCallback(new JMSPostQueueDeletionCallback());
/**
* See this method's javadoc.
* <p>
@@ -794,11 +790,11 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
public synchronized boolean destroyQueue(final String name, final boolean removeConsumers) throws Exception {
checkInitialised();
- server.destroyQueue(ActiveMQDestination.createQueueAddressFromName(name), null, !removeConsumers, removeConsumers);
+ server.destroyQueue(SimpleString.toSimpleString(name), null, !removeConsumers, removeConsumers);
// if the queue has consumers and 'removeConsumers' is false then the queue won't actually be removed
// therefore only remove the queue from Bindings, etc. if the queue is actually removed
- if (this.server.getPostOffice().getBinding(ActiveMQDestination.createQueueAddressFromName(name)) == null) {
+ if (this.server.getPostOffice().getBinding(SimpleString.toSimpleString(name)) == null) {
removeFromBindings(queues, queueBindings, name);
queues.remove(name);
@@ -823,7 +819,7 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
@Override
public synchronized boolean destroyTopic(final String name, final boolean removeConsumers) throws Exception {
checkInitialised();
- AddressControl addressControl = (AddressControl) server.getManagementService().getResource(ResourceNames.CORE_ADDRESS + ActiveMQDestination.createTopicAddressFromName(name));
+ AddressControl addressControl = (AddressControl) server.getManagementService().getResource(ResourceNames.CORE_ADDRESS + name);
if (addressControl != null) {
for (String queueName : addressControl.getQueueNames()) {
Binding binding = server.getPostOffice().getBinding(new SimpleString(queueName));
@@ -1093,6 +1089,8 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
coreFilterString = SelectorTranslator.convertToActiveMQFilterString(selectorString);
}
+ server.createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(activeMQQueue.getName())).setRoutingType(AddressInfo.RoutingType.ANYCAST).setDefaultMaxQueueConsumers(-1));
+
Queue queue = server.deployQueue(SimpleString.toSimpleString(activeMQQueue.getAddress()), SimpleString.toSimpleString(activeMQQueue.getAddress()), SimpleString.toSimpleString(coreFilterString), durable, false, autoCreated);
queues.put(queueName, activeMQQueue);
@@ -1128,7 +1126,8 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
// checks when routing messages to a topic that
// does not exist - otherwise we would not be able to distinguish from a non existent topic and one with no
// subscriptions - core has no notion of a topic
- server.deployQueue(SimpleString.toSimpleString(activeMQTopic.getAddress()), SimpleString.toSimpleString(activeMQTopic.getAddress()), SimpleString.toSimpleString(JMSServerManagerImpl.REJECT_FILTER), true, false, autoCreated);
+// server.deployQueue(SimpleString.toSimpleString(activeMQTopic.getAddress()), SimpleString.toSimpleString(activeMQTopic.getAddress()), SimpleString.toSimpleString(JMSServerManagerImpl.REJECT_FILTER), true, false, autoCreated);
+ server.createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(activeMQTopic.getAddress())));
topics.put(topicName, activeMQTopic);
@@ -1640,95 +1639,95 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
* This class is responsible for auto-creating the JMS (and underlying core) resources when a client sends a message
* to a non-existent JMS queue or topic
*/
- class JMSDestinationCreator implements QueueCreator {
-
- @Override
- public boolean create(SimpleString address) throws Exception {
- AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
- if (address.toString().startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX) && settings.isAutoCreateJmsQueues()) {
- return internalCreateJMSQueue(false, address.toString().substring(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX.length()), null, true, true);
- } else if (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && settings.isAutoCreateJmsTopics()) {
- return createTopic(false, address.toString().substring(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX.length()), true);
- } else {
- return false;
- }
- }
- }
-
- class JMSQueueDeleter implements QueueDeleter {
-
- @Override
- public boolean delete(SimpleString queueName) throws Exception {
- Queue queue = server.locateQueue(queueName);
- SimpleString address = queue.getAddress();
- AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
- long consumerCount = queue.getConsumerCount();
- long messageCount = queue.getMessageCount();
-
- if (address.toString().startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX) && settings.isAutoDeleteJmsQueues() && queue.getMessageCount() == 0) {
- if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) {
- ActiveMQJMSServerLogger.LOGGER.debug("deleting auto-created queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteJmsQueues = " + settings.isAutoDeleteJmsQueues());
- }
-
- return destroyQueue(queueName.toString().substring(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX.length()), false);
- } else {
- return false;
- }
- }
- }
+// class JMSDestinationCreator implements QueueCreator {
+//
+// @Override
+// public boolean create(SimpleString address) throws Exception {
+// AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
+// if (address.toString().startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX) && settings.isAutoCreateJmsQueues()) {
+// return internalCreateJMSQueue(false, address.toString().substring(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX.length()), null, true, true);
+// } else if (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && settings.isAutoCreateJmsTopics()) {
+// return createTopic(false, address.toString().substring(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX.length()), true);
+// } else {
+// return false;
+// }
+// }
+// }
+
+// class JMSQueueDeleter implements QueueDeleter {
+//
+// @Override
+// public boolean delete(SimpleString queueName) throws Exception {
+// Queue queue = server.locateQueue(queueName);
+// SimpleString address = queue.getAddress();
+// AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
+// long consumerCount = queue.getConsumerCount();
+// long messageCount = queue.getMessageCount();
+//
+// if (address.toString().startsWith(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX) && settings.getAutoDeleteJmsQueues() && queue.getMessageCount() == 0) {
+// if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) {
+// ActiveMQJMSServerLogger.LOGGER.debug("deleting auto-created queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; getAutoDeleteJmsQueues = " + settings.getAutoDeleteJmsQueues());
+// }
+//
+// return destroyQueue(queueName.toString().substring(ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX.length()), false);
+// } else {
+// return false;
+// }
+// }
+// }
/**
* When a core queue is created with a jms.topic prefix this class will create the associated JMS resources
* retroactively. This would happen if, for example, a client created a subscription a non-existent JMS topic and
* autoCreateJmsTopics = true.
*/
- class JMSPostQueueCreationCallback implements PostQueueCreationCallback {
-
- @Override
- public void callback(SimpleString queueName) throws Exception {
- Queue queue = server.locateQueue(queueName);
- String address = queue.getAddress().toString();
-
- AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
- /* When a topic is created a dummy subscription is created which never receives any messages; when the queue
- * for that dummy subscription is created we don't want to call createTopic again. Therefore we make sure the
- * queue name doesn't start with the topic prefix.
- */
- if (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && settings.isAutoCreateJmsTopics() && !queueName.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX)) {
- createTopic(false, address.toString().substring(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX.length()), true);
- }
- }
- }
+// class JMSPostQueueCreationCallback implements PostQueueCreationCallback {
+//
+// @Override
+// public void callback(SimpleString queueName) throws Exception {
+// Queue queue = server.locateQueue(queueName);
+// String address = queue.getAddress().toString();
+//
+// AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
+// /* When a topic is created a dummy subscription is created which never receives any messages; when the queue
+// * for that dummy subscription is created we don't want to call createTopic again. Therefore we make sure the
+// * queue name doesn't start with the topic prefix.
+// */
+// if (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && settings.isAutoCreateJmsTopics() && !queueName.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX)) {
+// createTopic(false, address.toString().substring(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX.length()), true);
+// }
+// }
+// }
/**
* When a core queue representing a JMS topic subscription is deleted this class will check to see if that was the
* last subscription on the topic and if so and autoDeleteJmsTopics = true then it will delete the JMS resources
* for that topic.
*/
- class JMSPostQueueDeletionCallback implements PostQueueDeletionCallback {
-
- @Override
- public void callback(SimpleString address, SimpleString queueName) throws Exception {
- Queue queue = server.locateQueue(address);
- Collection<Binding> bindings = server.getPostOffice().getBindingsForAddress(address).getBindings();
-
- AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
-
- if (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && settings.isAutoDeleteJmsTopics() && bindings.size() == 1 && queue != null && queue.isAutoCreated()) {
- try {
- destroyTopic(address.toString().substring(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX.length()));
- } catch (IllegalStateException e) {
- /*
- * During shutdown the callback can be invoked after the JMSServerManager is already shut down so we just
- * ignore the exception in that case
- */
- if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) {
- ActiveMQJMSServerLogger.LOGGER.debug("Failed to destroy topic", e);
- }
- }
- }
- }
- }
+// class JMSPostQueueDeletionCallback implements PostQueueDeletionCallback {
+//
+// @Override
+// public void callback(SimpleString address, SimpleString queueName) throws Exception {
+// Queue queue = server.locateQueue(address);
+// Collection<Binding> bindings = server.getPostOffice().getBindingsForAddress(address).getBindings();
+//
+// AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
+//
+// if (address.toString().startsWith(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX) && settings.isAutoDeleteJmsTopics() && bindings.size() == 1 && queue != null && queue.isAutoCreated()) {
+// try {
+// destroyTopic(address.toString().substring(ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX.length()));
+// } catch (IllegalStateException e) {
+// /*
+// * During shutdown the callback can be invoked after the JMSServerManager is already shut down so we just
+// * ignore the exception in that case
+// */
+// if (ActiveMQJMSServerLogger.LOGGER.isDebugEnabled()) {
+// ActiveMQJMSServerLogger.LOGGER.debug("Failed to destroy topic", e);
+// }
+// }
+// }
+// }
+// }
private final class JMSReloader implements ReloadCallback {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/management/impl/JMSManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/management/impl/JMSManagementServiceImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/management/impl/JMSManagementServiceImpl.java
index 0c56e24..2b3f7a2 100644
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/management/impl/JMSManagementServiceImpl.java
+++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/management/impl/JMSManagementServiceImpl.java
@@ -89,14 +89,14 @@ public class JMSManagementServiceImpl implements JMSManagementService {
ObjectName objectName = managementService.getObjectNameBuilder().getJMSQueueObjectName(queue.getQueueName());
JMSQueueControlImpl control = new JMSQueueControlImpl(queue, coreQueueControl, jmsServerManager, counter);
managementService.registerInJMX(objectName, control);
- managementService.registerInRegistry(ResourceNames.JMS_QUEUE + queue.getQueueName(), control);
+ managementService.registerInRegistry(queue.getQueueName(), control);
}
@Override
public synchronized void unregisterQueue(final String name) throws Exception {
ObjectName objectName = managementService.getObjectNameBuilder().getJMSQueueObjectName(name);
managementService.unregisterFromJMX(objectName);
- managementService.unregisterFromRegistry(ResourceNames.JMS_QUEUE + name);
+ managementService.unregisterFromRegistry(name);
}
@Override
@@ -105,14 +105,14 @@ public class JMSManagementServiceImpl implements JMSManagementService {
AddressControl addressControl = (AddressControl) managementService.getResource(ResourceNames.CORE_ADDRESS + topic.getAddress());
JMSTopicControlImpl control = new JMSTopicControlImpl(topic, jmsServerManager, addressControl, managementService);
managementService.registerInJMX(objectName, control);
- managementService.registerInRegistry(ResourceNames.JMS_TOPIC + topic.getTopicName(), control);
+ managementService.registerInRegistry(topic.getTopicName(), control);
}
@Override
public synchronized void unregisterTopic(final String name) throws Exception {
ObjectName objectName = managementService.getObjectNameBuilder().getJMSTopicObjectName(name);
managementService.unregisterFromJMX(objectName);
- managementService.unregisterFromRegistry(ResourceNames.JMS_TOPIC + name);
+ managementService.unregisterFromRegistry(name);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-junit/src/test/java/org/apache/activemq/artemis/junit/EmbeddedJMSResourceMultipleFileConfigurationTest.java
----------------------------------------------------------------------
diff --git a/artemis-junit/src/test/java/org/apache/activemq/artemis/junit/EmbeddedJMSResourceMultipleFileConfigurationTest.java b/artemis-junit/src/test/java/org/apache/activemq/artemis/junit/EmbeddedJMSResourceMultipleFileConfigurationTest.java
index ceb06e8..7e3b313 100644
--- a/artemis-junit/src/test/java/org/apache/activemq/artemis/junit/EmbeddedJMSResourceMultipleFileConfigurationTest.java
+++ b/artemis-junit/src/test/java/org/apache/activemq/artemis/junit/EmbeddedJMSResourceMultipleFileConfigurationTest.java
@@ -76,7 +76,7 @@ public class EmbeddedJMSResourceMultipleFileConfigurationTest {
List<Queue> boundQueues = jmsServer.getTopicQueues(TEST_TOPIC);
assertNotNull("List should never be null", boundQueues);
- assertEquals("Should have two queues bound to topic " + TEST_TOPIC, 2, boundQueues.size());
+ assertEquals("Should have two queues bound to topic " + TEST_TOPIC, 1, boundQueues.size());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-junit/src/test/java/org/apache/activemq/artemis/junit/EmbeddedJMSResourceSingleFileConfigurationTest.java
----------------------------------------------------------------------
diff --git a/artemis-junit/src/test/java/org/apache/activemq/artemis/junit/EmbeddedJMSResourceSingleFileConfigurationTest.java b/artemis-junit/src/test/java/org/apache/activemq/artemis/junit/EmbeddedJMSResourceSingleFileConfigurationTest.java
index 5ca3560..9651a7a 100644
--- a/artemis-junit/src/test/java/org/apache/activemq/artemis/junit/EmbeddedJMSResourceSingleFileConfigurationTest.java
+++ b/artemis-junit/src/test/java/org/apache/activemq/artemis/junit/EmbeddedJMSResourceSingleFileConfigurationTest.java
@@ -76,7 +76,7 @@ public class EmbeddedJMSResourceSingleFileConfigurationTest {
List<Queue> boundQueues = jmsServer.getTopicQueues(TEST_TOPIC);
assertNotNull("List should never be null", boundQueues);
- assertEquals("Should have two queues bound to topic " + TEST_TOPIC, 2, boundQueues.size());
+ assertEquals("Should have two queues bound to topic " + TEST_TOPIC, 1, boundQueues.size());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
index f5b6c78..9b84dc1 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -58,7 +58,8 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
* used when you want to treat senders as a subscription on an address rather than consuming from the actual queue for
* the address. This can be changed on the acceptor.
* */
- private String pubSubPrefix = ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX;
+ // TODO fix this
+ private String pubSubPrefix = ActiveMQDestination.TOPIC_QUALIFIED_PREFIX;
private int maxFrameSize = AMQPConstants.Connection.DEFAULT_MAX_FRAME_SIZE;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
index 96ce90e..6beee36 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
@@ -248,7 +248,7 @@ public class TestConversions extends Assert {
}
private void simulatePersistence(ServerMessage serverMessage) {
- serverMessage.setAddress(new SimpleString("jms.queue.SomeAddress"));
+ serverMessage.setAddress(new SimpleString("SomeAddress"));
// This is just to simulate what would happen during the persistence of the message
// We need to still be able to recover the message when we read it back
((EncodingSupport) serverMessage).encode(new EmptyBuffer());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index e53b962..5603cb8 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
@@ -84,9 +85,9 @@ public class AMQConsumer {
if (openwireDestination.isTopic()) {
if (openwireDestination.isTemporary()) {
- address = new SimpleString("jms.temptopic." + physicalName);
+ address = new SimpleString(physicalName);
} else {
- address = new SimpleString("jms.topic." + physicalName);
+ address = new SimpleString(physicalName);
}
SimpleString queueName = createTopicSubscription(info.isDurable(), info.getClientId(), physicalName, info.getSubscriptionName(), selector, address);
@@ -95,7 +96,11 @@ public class AMQConsumer {
serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
} else {
SimpleString queueName = OpenWireUtil.toCoreAddress(openwireDestination);
- session.getCoreServer().getJMSDestinationCreator().create(queueName);
+ try {
+ session.getCoreServer().createQueue(queueName, queueName, null, true, false);
+ } catch (ActiveMQQueueExistsException e) {
+ // ignore
+ }
serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1);
serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
AddressSettings addrSettings = session.getCoreServer().getAddressSettingsRepository().getMatch(queueName.toString());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 426f4e6..5cab686 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
@@ -145,7 +146,11 @@ public class AMQSession implements SessionCallback {
for (ActiveMQDestination openWireDest : dests) {
if (openWireDest.isQueue()) {
SimpleString queueName = OpenWireUtil.toCoreAddress(openWireDest);
- getCoreServer().getJMSDestinationCreator().create(queueName);
+ try {
+ getCoreServer().createQueue(queueName, queueName, null, true, false);
+ } catch (ActiveMQQueueExistsException e) {
+ // ignore
+ }
}
AMQConsumer consumer = new AMQConsumer(this, openWireDest, info, scheduledPool);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
index 05e1e34..a6e7292 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
@@ -28,11 +28,6 @@ import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.util.ByteSequence;
-import static org.apache.activemq.artemis.jms.client.ActiveMQDestination.JMS_QUEUE_ADDRESS_PREFIX;
-import static org.apache.activemq.artemis.jms.client.ActiveMQDestination.JMS_TEMP_QUEUE_ADDRESS_PREFIX;
-import static org.apache.activemq.artemis.jms.client.ActiveMQDestination.JMS_TEMP_TOPIC_ADDRESS_PREFIX;
-import static org.apache.activemq.artemis.jms.client.ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX;
-
public class OpenWireUtil {
public static ActiveMQBuffer toActiveMQBuffer(ByteSequence bytes) {
@@ -45,15 +40,15 @@ public class OpenWireUtil {
public static SimpleString toCoreAddress(ActiveMQDestination dest) {
if (dest.isQueue()) {
if (dest.isTemporary()) {
- return new SimpleString(JMS_TEMP_QUEUE_ADDRESS_PREFIX + dest.getPhysicalName());
+ return new SimpleString(dest.getPhysicalName());
} else {
- return new SimpleString(JMS_QUEUE_ADDRESS_PREFIX + dest.getPhysicalName());
+ return new SimpleString(dest.getPhysicalName());
}
} else {
if (dest.isTemporary()) {
- return new SimpleString(JMS_TEMP_TOPIC_ADDRESS_PREFIX + dest.getPhysicalName());
+ return new SimpleString(dest.getPhysicalName());
} else {
- return new SimpleString(JMS_TOPIC_ADDRESS_PREFIX + dest.getPhysicalName());
+ return new SimpleString(dest.getPhysicalName());
}
}
}
@@ -66,7 +61,7 @@ public class OpenWireUtil {
*/
public static ActiveMQDestination toAMQAddress(ServerMessage message, ActiveMQDestination actualDestination) {
String address = message.getAddress().toString();
- String strippedAddress = address.replace(JMS_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TEMP_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TOPIC_ADDRESS_PREFIX, "").replace(JMS_TEMP_TOPIC_ADDRESS_PREFIX, "");
+ String strippedAddress = address;//.replace(JMS_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TEMP_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TOPIC_ADDRESS_PREFIX, "").replace(JMS_TEMP_TOPIC_ADDRESS_PREFIX, "");
if (actualDestination.isQueue()) {
return new ActiveMQQueue(strippedAddress);
} else {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index a6ddf68..74d03d1 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10;
@@ -37,8 +38,8 @@ import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.QueueCreator;
import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
@@ -252,11 +253,12 @@ public final class StompConnection implements RemotingConnection {
}
public void autoCreateDestinationIfPossible(String queue) throws ActiveMQStompException {
+ // TODO: STOMP clients will have to prefix their destination with queue:// or topic:// so we can determine what to do here
try {
- QueueCreator queueCreator = manager.getServer().getJMSDestinationCreator();
- if (queueCreator != null) {
- queueCreator.create(SimpleString.toSimpleString(queue));
- }
+ manager.getServer().createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(queue)).setRoutingType(AddressInfo.RoutingType.ANYCAST));
+ manager.getServer().createQueue(SimpleString.toSimpleString(queue), SimpleString.toSimpleString(queue), null, true, false);
+ } catch (ActiveMQQueueExistsException e) {
+ // ignore
} catch (Exception e) {
throw new ActiveMQStompException(e.getMessage(), e).setHandler(frameHandler);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index f86dd92..d207544 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -285,7 +286,7 @@ public class StompSession implements SessionCallback {
receiveCredits = -1;
}
- if (destination.startsWith("jms.topic")) {
+ if (manager.getServer().getAddressInfo(SimpleString.toSimpleString(destination)).getRoutingType().equals(AddressInfo.RoutingType.MULTICAST)) {
// subscribes to a topic
pubSub = true;
if (durableSubscriptionName != null) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/integration/EmbeddedRestActiveMQJMSTest.java
----------------------------------------------------------------------
diff --git a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/integration/EmbeddedRestActiveMQJMSTest.java b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/integration/EmbeddedRestActiveMQJMSTest.java
index 9e10ef7..f012020 100644
--- a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/integration/EmbeddedRestActiveMQJMSTest.java
+++ b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/integration/EmbeddedRestActiveMQJMSTest.java
@@ -65,7 +65,7 @@ public class EmbeddedRestActiveMQJMSTest {
List<String> connectors = createInVmConnector();
server.getEmbeddedJMS().getJMSServerManager().createConnectionFactory("ConnectionFactory", false, JMSFactoryType.CF, connectors, "ConnectionFactory");
- ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/queues/jms.queue.exampleQueue"));
+ ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/queues/exampleQueue"));
ClientResponse<?> response = request.head();
response.releaseConnection();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/EmbeddedTest.java
----------------------------------------------------------------------
diff --git a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/EmbeddedTest.java b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/EmbeddedTest.java
index dea9c0e..dc0ea0f 100644
--- a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/EmbeddedTest.java
+++ b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/EmbeddedTest.java
@@ -94,7 +94,7 @@ public class EmbeddedTest {
@Test
public void testTransform() throws Exception {
- ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/queues/jms.queue.exampleQueue"));
+ ClientRequest request = new ClientRequest(TestPortProvider.generateURL("/queues/exampleQueue"));
ClientResponse<?> response = request.head();
response.releaseConnection();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/JMSTest.java
----------------------------------------------------------------------
diff --git a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/JMSTest.java b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/JMSTest.java
index c3228ad..77d88d1 100644
--- a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/JMSTest.java
+++ b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/JMSTest.java
@@ -106,7 +106,7 @@ public class JMSTest extends MessageTestBase {
}
public static Destination createDestination(String dest) {
- ActiveMQDestination destination = (ActiveMQDestination) ActiveMQDestination.fromAddress(dest);
+ ActiveMQDestination destination = (ActiveMQDestination) ActiveMQDestination.fromPrefixedName(dest);
System.out.println("SimpleAddress: " + destination.getSimpleAddress());
return destination;
}
@@ -150,8 +150,9 @@ public class JMSTest extends MessageTestBase {
@Test
public void testJmsConsumer() throws Exception {
- String queueName = ActiveMQDestination.createQueueAddressFromName("testQueue2").toString();
- System.out.println("Queue name: " + queueName);
+ String queueName = "testQueue2";
+ String prefixedQueueName = ActiveMQDestination.createQueueAddressFromName(queueName).toString();
+ System.out.println("Queue name: " + prefixedQueueName);
QueueDeployment deployment = new QueueDeployment();
deployment.setDuplicatesAllowed(true);
deployment.setDurableSend(false);
@@ -160,7 +161,7 @@ public class JMSTest extends MessageTestBase {
Connection conn = connectionFactory.createConnection();
try {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = createDestination(queueName);
+ Destination destination = createDestination(prefixedQueueName);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new Listener());
conn.start();
@@ -196,8 +197,9 @@ public class JMSTest extends MessageTestBase {
@Test
public void testJmsProducer() throws Exception {
- String queueName = ActiveMQDestination.createQueueAddressFromName("testQueue").toString();
- System.out.println("Queue name: " + queueName);
+ String queueName = "testQueue";
+ String prefixedQueueName = ActiveMQDestination.createQueueAddressFromName(queueName).toString();
+ System.out.println("Queue name: " + prefixedQueueName);
QueueDeployment deployment = new QueueDeployment();
deployment.setDuplicatesAllowed(true);
deployment.setDurableSend(false);
@@ -221,7 +223,7 @@ public class JMSTest extends MessageTestBase {
Order order = new Order();
order.setName("1");
order.setAmount("$5.00");
- publish(queueName, order, null);
+ publish(prefixedQueueName, order, null);
ClientResponse<?> res = consumeNext.request().header("Accept-Wait", "2").accept("application/xml").post(String.class);
Assert.assertEquals(200, res.getStatus());
@@ -238,7 +240,7 @@ public class JMSTest extends MessageTestBase {
Order order = new Order();
order.setName("1");
order.setAmount("$5.00");
- publish(queueName, order, null);
+ publish(prefixedQueueName, order, null);
ClientResponse<?> res = consumeNext.request().header("Accept-Wait", "2").accept("application/json").post(String.class);
Assert.assertEquals(200, res.getStatus());
@@ -255,7 +257,7 @@ public class JMSTest extends MessageTestBase {
Order order = new Order();
order.setName("2");
order.setAmount("$15.00");
- publish(queueName, order, "application/xml");
+ publish(prefixedQueueName, order, "application/xml");
ClientResponse<?> res = consumeNext.request().header("Accept-Wait", "2").post(String.class);
Assert.assertEquals(200, res.getStatus());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/SelectorTest.java
----------------------------------------------------------------------
diff --git a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/SelectorTest.java b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/SelectorTest.java
index 176d61e..1491f51 100644
--- a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/SelectorTest.java
+++ b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/SelectorTest.java
@@ -45,12 +45,13 @@ import static org.jboss.resteasy.test.TestPortProvider.generateURL;
public class SelectorTest extends MessageTestBase {
public static ConnectionFactory connectionFactory;
- public static String topicName = ActiveMQDestination.createQueueAddressFromName("testTopic").toString();
+ public static String topicName = "testTopic";
+ public static String prefixedTopicName = ActiveMQDestination.createQueueAddressFromName(topicName).toString();
@BeforeClass
public static void setup() throws Exception {
connectionFactory = new ActiveMQJMSConnectionFactory(manager.getQueueManager().getServerLocator());
- System.out.println("Queue name: " + topicName);
+ System.out.println("Queue name: " + prefixedTopicName);
TopicDeployment deployment = new TopicDeployment();
deployment.setDuplicatesAllowed(true);
deployment.setDurableSend(false);
@@ -118,7 +119,7 @@ public class SelectorTest extends MessageTestBase {
}
public static Destination createDestination(String dest) {
- ActiveMQDestination destination = (ActiveMQDestination) ActiveMQDestination.fromAddress(dest);
+ ActiveMQDestination destination = (ActiveMQDestination) ActiveMQDestination.fromPrefixedName(dest);
System.out.println("SimpleAddress: " + destination.getSimpleAddress());
return destination;
}
@@ -203,32 +204,32 @@ public class SelectorTest extends MessageTestBase {
Order order = new Order();
order.setName("1");
order.setAmount("$5.00");
- publish(topicName, order, null, "1");
+ publish(prefixedTopicName, order, null, "1");
Thread.sleep(200);
Assert.assertEquals(order, PushReceiver.oneOrder);
order.setName("2");
- publish(topicName, order, null, "2");
+ publish(prefixedTopicName, order, null, "2");
Thread.sleep(200);
Assert.assertEquals(order, PushReceiver.twoOrder);
order.setName("3");
- publish(topicName, order, null, "2");
+ publish(prefixedTopicName, order, null, "2");
Thread.sleep(200);
Assert.assertEquals(order, PushReceiver.twoOrder);
order.setName("4");
- publish(topicName, order, null, "1");
+ publish(prefixedTopicName, order, null, "1");
Thread.sleep(200);
Assert.assertEquals(order, PushReceiver.oneOrder);
order.setName("5");
- publish(topicName, order, null, "1");
+ publish(prefixedTopicName, order, null, "1");
Thread.sleep(200);
Assert.assertEquals(order, PushReceiver.oneOrder);
order.setName("6");
- publish(topicName, order, null, "1");
+ publish(prefixedTopicName, order, null, "1");
Thread.sleep(200);
Assert.assertEquals(order, PushReceiver.oneOrder);
@@ -262,17 +263,17 @@ public class SelectorTest extends MessageTestBase {
Order order = new Order();
order.setName("1");
order.setAmount("$5.00");
- publish(topicName, order, null, "1");
+ publish(prefixedTopicName, order, null, "1");
order.setName("2");
- publish(topicName, order, null, "2");
+ publish(prefixedTopicName, order, null, "2");
order.setName("3");
- publish(topicName, order, null, "2");
+ publish(prefixedTopicName, order, null, "2");
order.setName("4");
- publish(topicName, order, null, "1");
+ publish(prefixedTopicName, order, null, "1");
order.setName("5");
- publish(topicName, order, null, "1");
+ publish(prefixedTopicName, order, null, "1");
order.setName("6");
- publish(topicName, order, null, "1");
+ publish(prefixedTopicName, order, null, "1");
{
order.setName("1");
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/XmlTest.java
----------------------------------------------------------------------
diff --git a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/XmlTest.java b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/XmlTest.java
index 2c0bd9d..726e16e 100644
--- a/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/XmlTest.java
+++ b/artemis-rest/src/test/java/org/apache/activemq/artemis/rest/test/XmlTest.java
@@ -27,7 +27,7 @@ public class XmlTest {
@Test
public void testPush() throws Exception {
String xml = "<push-registration id=\"111\">\n" +
- " <destination>jms.queue.bar</destination>\n" +
+ " <destination>bar</destination>\n" +
" <durable>true</durable>\n" +
" <session-count>10</session-count>\n" +
" <link rel=\"template\" href=\"http://somewhere.com/resources/{id}/messages\" method=\"PUT\"/>\n" +
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-rest/src/test/resources/broker.xml
----------------------------------------------------------------------
diff --git a/artemis-rest/src/test/resources/broker.xml b/artemis-rest/src/test/resources/broker.xml
index 2993d98..4d76412 100644
--- a/artemis-rest/src/test/resources/broker.xml
+++ b/artemis-rest/src/test/resources/broker.xml
@@ -39,7 +39,7 @@
<security-settings>
<!--security for example queue-->
- <security-setting match="jms.queue.exampleQueue">
+ <security-setting match="exampleQueue">
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="createNonDurableQueue" roles="guest"/>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index fcbf15c..9140fe4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -1505,15 +1505,29 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
final String deleteNonDurableQueueRoles,
final String manageRoles,
final String browseRoles) throws Exception {
+ addSecuritySettings(addressMatch, sendRoles, consumeRoles, createDurableQueueRoles, deleteDurableQueueRoles, createNonDurableQueueRoles, deleteNonDurableQueueRoles, manageRoles, browseRoles, "");
+ }
+
+ @Override
+ public void addSecuritySettings(final String addressMatch,
+ final String sendRoles,
+ final String consumeRoles,
+ final String createDurableQueueRoles,
+ final String deleteDurableQueueRoles,
+ final String createNonDurableQueueRoles,
+ final String deleteNonDurableQueueRoles,
+ final String manageRoles,
+ final String browseRoles,
+ final String createAddressRoles) throws Exception {
checkStarted();
clearIO();
try {
- Set<Role> roles = SecurityFormatter.createSecurity(sendRoles, consumeRoles, createDurableQueueRoles, deleteDurableQueueRoles, createNonDurableQueueRoles, deleteNonDurableQueueRoles, manageRoles, browseRoles);
+ Set<Role> roles = SecurityFormatter.createSecurity(sendRoles, consumeRoles, createDurableQueueRoles, deleteDurableQueueRoles, createNonDurableQueueRoles, deleteNonDurableQueueRoles, manageRoles, browseRoles, createAddressRoles);
server.getSecurityRepository().addMatch(addressMatch, roles);
- PersistedRoles persistedRoles = new PersistedRoles(addressMatch, sendRoles, consumeRoles, createDurableQueueRoles, deleteDurableQueueRoles, createNonDurableQueueRoles, deleteNonDurableQueueRoles, manageRoles, browseRoles);
+ PersistedRoles persistedRoles = new PersistedRoles(addressMatch, sendRoles, consumeRoles, createDurableQueueRoles, deleteDurableQueueRoles, createNonDurableQueueRoles, deleteNonDurableQueueRoles, manageRoles, browseRoles, createAddressRoles);
storageManager.storeSecurityRoles(persistedRoles);
} finally {
@@ -1588,7 +1602,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
if (addressSettings.getExpiryAddress() != null) {
settings.add("expiryAddress", addressSettings.getExpiryAddress().toString());
}
- return settings.add("expiryDelay", addressSettings.getExpiryDelay()).add("maxDeliveryAttempts", addressSettings.getMaxDeliveryAttempts()).add("pageCacheMaxSize", addressSettings.getPageCacheMaxSize()).add("maxSizeBytes", addressSettings.getMaxSizeBytes()).add("pageSizeBytes", addressSettings.getPageSizeBytes()).add("redeliveryDelay", addressSettings.getRedeliveryDelay()).add("redeliveryMultiplier", addressSettings.getRedeliveryMultiplier()).add("maxRedeliveryDelay", addressSettings.getMaxRedeliveryDelay()).add("redistributionDelay", addressSettings.getRedistributionDelay()).add("lastValueQueue", addressSettings.isLastValueQueue()).add("sendToDLAOnNoRoute", addressSettings.isSendToDLAOnNoRoute()).add("addressFullMessagePolicy", policy).add("slowConsumerThreshold", addressSettings.getSlowConsumerThreshold()).add("slowConsumerCheckPeriod", addressSettings.getSlowConsumerCheckPeriod()).add("slowConsumerPolicy", consumerPolicy).add("autoCreateJmsQueues", addressSettings.isAutoCreat
eJmsQueues()).add("autoDeleteJmsQueues", addressSettings.isAutoDeleteJmsQueues()).add("autoCreateJmsTopics", addressSettings.isAutoCreateJmsTopics()).add("autoDeleteJmsTopics", addressSettings.isAutoDeleteJmsTopics()).build().toString();
+ return settings.add("expiryDelay", addressSettings.getExpiryDelay()).add("maxDeliveryAttempts", addressSettings.getMaxDeliveryAttempts()).add("pageCacheMaxSize", addressSettings.getPageCacheMaxSize()).add("maxSizeBytes", addressSettings.getMaxSizeBytes()).add("pageSizeBytes", addressSettings.getPageSizeBytes()).add("redeliveryDelay", addressSettings.getRedeliveryDelay()).add("redeliveryMultiplier", addressSettings.getRedeliveryMultiplier()).add("maxRedeliveryDelay", addressSettings.getMaxRedeliveryDelay()).add("redistributionDelay", addressSettings.getRedistributionDelay()).add("lastValueQueue", addressSettings.isLastValueQueue()).add("sendToDLAOnNoRoute", addressSettings.isSendToDLAOnNoRoute()).add("addressFullMessagePolicy", policy).add("slowConsumerThreshold", addressSettings.getSlowConsumerThreshold()).add("slowConsumerCheckPeriod", addressSettings.getSlowConsumerCheckPeriod()).add("slowConsumerPolicy", consumerPolicy).add("autoCreateJmsQueues", addressSettings.isAutoCreat
eJmsQueues()).add("autoCreateJmsTopics", addressSettings.isAutoCreateJmsTopics()).add("autoDeleteJmsQueues", addressSettings.getAutoDeleteJmsQueues()).add("autoDeleteJmsTopics", addressSettings.getAutoDeleteJmsQueues()).build().toString();
}
@Override
@@ -1661,8 +1675,6 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
}
addressSettings.setAutoCreateJmsQueues(autoCreateJmsQueues);
addressSettings.setAutoDeleteJmsQueues(autoDeleteJmsQueues);
- addressSettings.setAutoCreateJmsTopics(autoCreateJmsTopics);
- addressSettings.setAutoDeleteJmsTopics(autoDeleteJmsTopics);
server.getAddressSettingsRepository().addMatch(address, addressSettings);
storageManager.storeAddressSetting(new PersistedAddressSetting(new SimpleString(address), addressSettings));
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java
index 838be12..2240ccd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java
@@ -27,4 +27,6 @@ public interface AddressBindingInfo {
AddressInfo.RoutingType getRoutingType();
+ int getDefaultMaxConsumers();
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
index 4d435c6..3a0c240 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
@@ -52,5 +52,5 @@ public interface QueueBindingInfo {
boolean isDeleteOnNoConsumers();
- void setDeleteOnNoConsumers();
+ void setDeleteOnNoConsumers(boolean deleteOnNoConsumers);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedRoles.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedRoles.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedRoles.java
index 383a75f..ffa0dbb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedRoles.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedRoles.java
@@ -46,6 +46,8 @@ public class PersistedRoles implements EncodingSupport {
private SimpleString browseRoles;
+ private SimpleString createAddressRoles;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -72,7 +74,8 @@ public class PersistedRoles implements EncodingSupport {
final String createNonDurableQueueRoles,
final String deleteNonDurableQueueRoles,
final String manageRoles,
- final String browseRoles) {
+ final String browseRoles,
+ final String createAddressRoles) {
super();
this.addressMatch = SimpleString.toSimpleString(addressMatch);
this.sendRoles = SimpleString.toSimpleString(sendRoles);
@@ -83,6 +86,7 @@ public class PersistedRoles implements EncodingSupport {
this.deleteNonDurableQueueRoles = SimpleString.toSimpleString(deleteNonDurableQueueRoles);
this.manageRoles = SimpleString.toSimpleString(manageRoles);
this.browseRoles = SimpleString.toSimpleString(browseRoles);
+ this.createAddressRoles = SimpleString.toSimpleString(createAddressRoles);
}
// Public --------------------------------------------------------
@@ -158,6 +162,13 @@ public class PersistedRoles implements EncodingSupport {
return browseRoles.toString();
}
+ /**
+ * @return the createAddressRoles
+ */
+ public String getCreateAddressRoles() {
+ return createAddressRoles.toString();
+ }
+
@Override
public void encode(final ActiveMQBuffer buffer) {
buffer.writeSimpleString(addressMatch);
@@ -169,6 +180,7 @@ public class PersistedRoles implements EncodingSupport {
buffer.writeNullableSimpleString(deleteNonDurableQueueRoles);
buffer.writeNullableSimpleString(manageRoles);
buffer.writeNullableSimpleString(browseRoles);
+ buffer.writeNullableSimpleString(createAddressRoles);
}
@Override
@@ -180,7 +192,8 @@ public class PersistedRoles implements EncodingSupport {
SimpleString.sizeofNullableString(createNonDurableQueueRoles) +
SimpleString.sizeofNullableString(deleteNonDurableQueueRoles) +
SimpleString.sizeofNullableString(manageRoles) +
- SimpleString.sizeofNullableString(browseRoles);
+ SimpleString.sizeofNullableString(browseRoles) +
+ SimpleString.sizeofNullableString(createAddressRoles);
}
@@ -195,6 +208,7 @@ public class PersistedRoles implements EncodingSupport {
deleteNonDurableQueueRoles = buffer.readNullableSimpleString();
manageRoles = buffer.readNullableSimpleString();
browseRoles = buffer.readNullableSimpleString();
+ createAddressRoles = buffer.readNullableSimpleString();
}
/* (non-Javadoc)
@@ -212,6 +226,7 @@ public class PersistedRoles implements EncodingSupport {
result = prime * result + ((deleteNonDurableQueueRoles == null) ? 0 : deleteNonDurableQueueRoles.hashCode());
result = prime * result + ((manageRoles == null) ? 0 : manageRoles.hashCode());
result = prime * result + ((browseRoles == null) ? 0 : browseRoles.hashCode());
+ result = prime * result + ((createAddressRoles == null) ? 0 : createAddressRoles.hashCode());
result = prime * result + ((sendRoles == null) ? 0 : sendRoles.hashCode());
result = prime * result + (int) (storeId ^ (storeId >>> 32));
return result;
@@ -269,6 +284,11 @@ public class PersistedRoles implements EncodingSupport {
return false;
} else if (!browseRoles.equals(other.browseRoles))
return false;
+ if (createAddressRoles == null) {
+ if (other.createAddressRoles != null)
+ return false;
+ } else if (!createAddressRoles.equals(other.createAddressRoles))
+ return false;
if (sendRoles == null) {
if (other.sendRoles != null)
return false;
@@ -303,6 +323,8 @@ public class PersistedRoles implements EncodingSupport {
manageRoles +
", browseRoles=" +
browseRoles +
+ ", createAddressRoles=" +
+ createAddressRoles +
"]";
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index b9e91ec..16ecdf3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -1221,7 +1221,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
SimpleString filterString = filter == null ? null : filter.getFilterString();
- PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated());
+ PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isDeleteOnNoConsumers());
readLock();
try {
@@ -1268,7 +1268,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
@Override
public void addAddressBinding(final long tx, final AddressInfo addressInfo) throws Exception {
- PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(), addressInfo.getRoutingType());
+ PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(), addressInfo.getRoutingType(), addressInfo.getDefaultMaxQueueConsumers());
readLock();
try {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java
index 7ef7e4d..3821b34 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java
@@ -29,6 +29,8 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
public SimpleString name;
+ public int defaultMaxConsumers;
+
public AddressInfo.RoutingType routingType;
public PersistentAddressBindingEncoding() {
@@ -41,13 +43,17 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
name +
", routingType=" +
routingType +
+ ", defaultMaxConsumers=" +
+ defaultMaxConsumers +
"]";
}
public PersistentAddressBindingEncoding(final SimpleString name,
- final AddressInfo.RoutingType routingType) {
+ final AddressInfo.RoutingType routingType,
+ final int defaultMaxConsumers) {
this.name = name;
this.routingType = routingType;
+ this.defaultMaxConsumers = defaultMaxConsumers;
}
@Override
@@ -70,19 +76,26 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
}
@Override
+ public int getDefaultMaxConsumers() {
+ return defaultMaxConsumers;
+ }
+
+ @Override
public void decode(final ActiveMQBuffer buffer) {
name = buffer.readSimpleString();
routingType = AddressInfo.RoutingType.getType(buffer.readByte());
+ defaultMaxConsumers = buffer.readInt();
}
@Override
public void encode(final ActiveMQBuffer buffer) {
buffer.writeSimpleString(name);
buffer.writeByte(routingType.getType());
+ buffer.writeInt(defaultMaxConsumers);
}
@Override
public int getEncodeSize() {
- return SimpleString.sizeofString(name) + DataConstants.SIZE_BYTE;
+ return SimpleString.sizeofString(name) + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
index 169cd7d..88bc1cf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
@@ -72,12 +72,16 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
final SimpleString address,
final SimpleString filterString,
final SimpleString user,
- final boolean autoCreated) {
+ final boolean autoCreated,
+ final int maxConsumers,
+ final boolean deleteOnNoConsumers) {
this.name = name;
this.address = address;
this.filterString = filterString;
this.user = user;
this.autoCreated = autoCreated;
+ this.maxConsumers = maxConsumers;
+ this.deleteOnNoConsumers = deleteOnNoConsumers;
}
@Override
@@ -134,12 +138,12 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
@Override
public int getMaxConsumers() {
- return 0;
+ return maxConsumers;
}
@Override
public void setMaxConsumers(int maxConsumers) {
-
+ this.maxConsumers = maxConsumers;
}
@Override
@@ -148,8 +152,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
}
@Override
- public void setDeleteOnNoConsumers() {
-
+ public void setDeleteOnNoConsumers(boolean deleteOnNoConsumers) {
+ this.deleteOnNoConsumers = deleteOnNoConsumers;
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 6c654bf..4c51373 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -421,11 +421,21 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override
public AddressInfo addAddressInfo(AddressInfo addressInfo) {
+ try {
+ getServer().getManagementService().registerAddress(addressInfo.getName());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
return addressManager.addAddressInfo(addressInfo);
}
@Override
public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) {
+ try {
+ getServer().getManagementService().registerAddress(addressInfo.getName());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
return addressManager.addOrUpdateAddressInfo(addressInfo);
}
@@ -490,6 +500,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
throw new ActiveMQNonExistentQueueException();
}
+ // TODO: see whether we still want to do this or not
if (deleteData && addressManager.getBindingsForRoutingAddress(binding.getAddress()) == null) {
pagingManager.deletePageStore(binding.getAddress());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
index 969a1a9..6ed2564 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
@@ -23,7 +23,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.postoffice.Address;
import org.apache.activemq.artemis.core.postoffice.AddressManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
@@ -39,7 +38,7 @@ import org.jboss.logging.Logger;
*/
public class SimpleAddressManager implements AddressManager {
- private static final Logger logger = Logger.getLogger(Page.class);
+ private static final Logger logger = Logger.getLogger(SimpleAddressManager.class);
private final ConcurrentMap<SimpleString, AddressInfo> addressInfoMap = new ConcurrentHashMap<>();
@@ -196,7 +195,7 @@ public class SimpleAddressManager implements AddressManager {
private AddressInfo updateAddressInfo(AddressInfo from, AddressInfo to) {
synchronized (from) {
from.setRoutingType(to.getRoutingType());
- from.setDefaultMaxConsumers(to.getDefaultMaxConsumers());
+ from.setDefaultMaxQueueConsumers(to.getDefaultMaxQueueConsumers());
from.setDefaultDeleteOnNoConsumers(to.isDefaultDeleteOnNoConsumers());
return from;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 2a45f29..be71a92 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -30,6 +30,7 @@ import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
@@ -82,6 +83,7 @@ import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.jboss.logging.Logger;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_ADDRESS;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DELETE_QUEUE;
@@ -220,6 +222,15 @@ public class ServerSessionPacketHandler implements ChannelHandler {
break;
}
+ case CREATE_ADDRESS: {
+ CreateAddressMessage request = (CreateAddressMessage) packet;
+ requiresResponse = request.isRequiresResponse();
+ session.createAddress(request.getAddress(), request.isMulticast());
+ if (requiresResponse) {
+ response = new NullResponseMessage();
+ }
+ break;
+ }
case CREATE_QUEUE: {
CreateQueueMessage request = (CreateQueueMessage) packet;
requiresResponse = request.isRequiresResponse();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-server/src/main/java/org/apache/activemq/artemis/core/security/CheckType.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/security/CheckType.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/security/CheckType.java
index 7d4cc00..abea943 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/security/CheckType.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/security/CheckType.java
@@ -29,6 +29,12 @@ public enum CheckType {
return role.isConsume();
}
},
+ CREATE_ADDRESS {
+ @Override
+ public boolean hasRole(final Role role) {
+ return role.isCreateAddress();
+ }
+ },
CREATE_DURABLE_QUEUE {
@Override
public boolean hasRole(final Role role) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index 9b5578c..51e1830 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -465,7 +465,7 @@ public interface ActiveMQServer extends ActiveMQComponent {
void removeClientConnection(String clientId);
- AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo);
+ AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception;
- AddressInfo removeAddressInfo(SimpleString address);
+ AddressInfo removeAddressInfo(SimpleString address) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
index 3b7ed71..81834be 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
@@ -33,7 +33,7 @@ public final class QueueConfig {
private final boolean durable;
private final boolean temporary;
private final boolean autoCreated;
- private final int maxConsumers;
+ private final Integer maxConsumers;
private final boolean deleteOnNoConsumers;
public static final class Builder {
@@ -47,7 +47,7 @@ public final class QueueConfig {
private boolean durable;
private boolean temporary;
private boolean autoCreated;
- private int maxConsumers;
+ private Integer maxConsumers;
private boolean deleteOnNoConsumers;
private Builder(final long id, final SimpleString name) {
@@ -112,7 +112,7 @@ public final class QueueConfig {
return this;
}
- public Builder maxConsumers(final int maxConsumers) {
+ public Builder maxConsumers(final Integer maxConsumers) {
this.maxConsumers = maxConsumers;
return this;
}
@@ -185,7 +185,7 @@ public final class QueueConfig {
final boolean durable,
final boolean temporary,
final boolean autoCreated,
- final int maxConsumers,
+ final Integer maxConsumers,
final boolean deleteOnNoConsumers) {
this.id = id;
this.address = address;
@@ -240,7 +240,7 @@ public final class QueueConfig {
return deleteOnNoConsumers;
}
- public int maxConsumers() {
+ public Integer maxConsumers() {
return maxConsumers;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index ab3898c..910eb22 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -26,6 +26,7 @@ import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.security.SecurityAuth;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -109,6 +110,8 @@ public interface ServerSession extends SecurityAuth {
boolean temporary,
boolean durable) throws Exception;
+ AddressInfo createAddress(final SimpleString address, final boolean multicast) throws Exception;
+
void deleteQueue(SimpleString name) throws Exception;
ServerConsumer createConsumer(long consumerID,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6303fcb4/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index ac30c53..423127a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -78,10 +78,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
// Attributes ----------------------------------------------------
- private static final SimpleString JMS_QUEUE_ADDRESS_PREFIX = new SimpleString("jms.queue.");
-
- private static final SimpleString JMS_TOPIC_ADDRESS_PREFIX = new SimpleString("jms.topic.");
-
protected final ServerLocatorInternal serverLocator;
protected final Executor executor;
@@ -879,16 +875,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
return;
}
- if (forwardingAddress.startsWith(BridgeImpl.JMS_QUEUE_ADDRESS_PREFIX) || forwardingAddress.startsWith(BridgeImpl.JMS_TOPIC_ADDRESS_PREFIX)) {
- if (!query.isExists()) {
- ActiveMQServerLogger.LOGGER.errorQueryingBridge(forwardingAddress, retryCount);
- scheduleRetryConnect();
- return;
- }
- } else {
- if (!query.isExists()) {
- ActiveMQServerLogger.LOGGER.bridgeNoBindings(getName(), getForwardingAddress(), getForwardingAddress());
- }
+ if (!query.isExists()) {
+ ActiveMQServerLogger.LOGGER.errorQueryingBridge(forwardingAddress, retryCount);
+ scheduleRetryConnect();
+ return;
}
}