You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/11/11 19:23:39 UTC
[09/11] activemq-artemis git commit: Stomp refactor + track
autocreation for addresses
Stomp refactor + track autocreation for addresses
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/84df373a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/84df373a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/84df373a
Branch: refs/heads/ARTEMIS-780
Commit: 84df373add77a74015e1c6af6c2719cc3ad53d3e
Parents: d961852
Author: jbertram <jb...@apache.com>
Authored: Tue Oct 18 19:45:02 2016 +0100
Committer: jbertram <jb...@apache.com>
Committed: Thu Nov 10 20:50:59 2016 -0600
----------------------------------------------------------------------
.../apache/activemq/cli/test/ArtemisTest.java | 4 +-
.../config/ActiveMQDefaultConfiguration.java | 3 +
.../artemis/api/core/client/ClientSession.java | 2 +-
.../core/client/impl/ClientSessionImpl.java | 4 +-
.../core/impl/ActiveMQSessionContext.java | 4 +-
.../impl/wireformat/CreateAddressMessage.java | 14 +
.../spi/core/remoting/SessionContext.java | 2 +-
.../jms/client/ActiveMQMessageProducer.java | 24 +-
.../artemis/jms/client/ActiveMQSession.java | 19 +-
.../protocol/mqtt/MQTTSubscriptionManager.java | 2 +-
.../artemis/core/protocol/stomp/Stomp.java | 34 +-
.../core/protocol/stomp/StompConnection.java | 21 +-
.../protocol/stomp/StompProtocolManager.java | 16 +-
.../stomp/VersionedStompFrameHandler.java | 14 +-
.../core/management/impl/QueueControlImpl.java | 2 +-
.../journal/AbstractJournalStorageManager.java | 7 +-
.../codec/PersistentAddressBindingEncoding.java | 20 +-
.../artemis/core/postoffice/PostOffice.java | 7 +-
.../core/postoffice/impl/PostOfficeImpl.java | 30 +-
.../artemis/core/server/ActiveMQServer.java | 26 +-
.../artemis/core/server/QueueCreator.java | 32 -
.../artemis/core/server/QueueDeleter.java | 28 -
.../artemis/core/server/QueueFactory.java | 2 +-
.../artemis/core/server/ServerSession.java | 5 +-
.../core/server/impl/ActiveMQServerImpl.java | 57 +-
.../artemis/core/server/impl/AddressInfo.java | 12 +
.../impl/AutoCreatedQueueManagerImpl.java | 32 +-
.../artemis/core/server/impl/DivertImpl.java | 2 +-
.../server/impl/PostOfficeJournalLoader.java | 4 +-
.../artemis/core/server/impl/QueueImpl.java | 4 +-
.../core/server/impl/ServerSessionImpl.java | 25 +-
.../management/impl/ManagementServiceImpl.java | 3 +-
.../core/config/impl/FileConfigurationTest.java | 59 +
.../vertx/IncomingVertxEventHandler.java | 2 +-
.../integration/client/HangConsumerTest.java | 4 +-
.../integration/stomp/ConcurrentStompTest.java | 136 --
.../tests/integration/stomp/ExtraStompTest.java | 848 ---------
.../stomp/StompConnectionCleanupTest.java | 52 +-
.../integration/stomp/StompOverHttpTest.java | 78 -
.../stomp/StompOverWebsocketTest.java | 151 --
.../tests/integration/stomp/StompTest.java | 1569 +++++----------
.../tests/integration/stomp/StompTestBase.java | 548 +++---
.../stomp/StompTestWithInterceptors.java | 159 ++
.../stomp/StompTestWithLargeMessages.java | 438 +++++
.../stomp/StompTestWithMessageID.java | 77 +
.../stomp/StompTestWithSecurity.java | 28 +-
.../stomp/util/AbstractClientStompFrame.java | 77 +-
.../util/AbstractStompClientConnection.java | 100 +-
.../stomp/util/ClientStompFrame.java | 10 +-
.../stomp/util/ClientStompFrameV10.java | 10 +-
.../stomp/util/ClientStompFrameV11.java | 22 +-
.../stomp/util/ClientStompFrameV12.java | 38 +-
.../stomp/util/StompClientConnection.java | 5 +-
.../stomp/util/StompClientConnectionV10.java | 43 +-
.../stomp/util/StompClientConnectionV11.java | 104 +-
.../stomp/util/StompClientConnectionV12.java | 79 +-
.../stomp/util/StompFrameFactory.java | 2 +
.../stomp/util/StompFrameFactoryV10.java | 11 +-
.../stomp/util/StompFrameFactoryV11.java | 26 +-
.../stomp/util/StompFrameFactoryV12.java | 36 +-
.../integration/stomp/v11/ExtraStompTest.java | 342 +---
.../integration/stomp/v11/StompV11Test.java | 1789 +++++++-----------
.../integration/stomp/v11/StompV11TestBase.java | 167 --
.../integration/stomp/v12/StompV12Test.java | 1760 +++++++----------
.../core/server/impl/fakes/FakePostOffice.java | 7 +-
65 files changed, 3450 insertions(+), 5788 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
index 3d89aa8..cac6229 100644
--- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
+++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
@@ -548,11 +548,11 @@ public class ArtemisTest {
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession coreSession = factory.createSession("admin", "admin", false, true, true, false, 0)) {
for (String str : queues.split(",")) {
- ClientSession.QueueQuery queryResult = coreSession.queueQuery(SimpleString.toSimpleString("jms.queue." + str));
+ ClientSession.QueueQuery queryResult = coreSession.queueQuery(SimpleString.toSimpleString(str));
assertTrue("Couldn't find queue " + str, queryResult.isExists());
}
for (String str : topics.split(",")) {
- ClientSession.QueueQuery queryResult = coreSession.queueQuery(SimpleString.toSimpleString("jms.topic." + str));
+ ClientSession.QueueQuery queryResult = coreSession.queueQuery(SimpleString.toSimpleString(str));
assertTrue("Couldn't find topic " + str, queryResult.isExists());
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 5511ab6..8227e06 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -1188,8 +1188,11 @@ public final class ActiveMQDefaultConfiguration {
public static boolean getDefaultDeleteQueueOnNoConsumers() {
return DEFAULT_DELETE_QUEUE_ON_NO_CONSUMERS;
}
+<<<<<<< HEAD
public static String getInternalNamingPrefix() {
return DEFAULT_INTERNAL_NAMING_PREFIX;
}
+=======
+>>>>>>> af5f1b1... ARTEMIS-782 Added configuration elements for new address model
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
index fbd33d3..35bc9f9 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
@@ -198,7 +198,7 @@ public interface ClientSession extends XAResource, AutoCloseable {
*/
int getVersion();
- void createAddress(final SimpleString address, final boolean multicast) throws ActiveMQException;
+ void createAddress(final SimpleString address, final boolean multicast, final boolean autoCreated) throws ActiveMQException;
// Queue Operations ----------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index 2739109..16311b0 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -279,12 +279,12 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
}
@Override
- public void createAddress(final SimpleString address, final boolean multicast) throws ActiveMQException {
+ public void createAddress(final SimpleString address, final boolean multicast, boolean autoCreated) throws ActiveMQException {
checkClosed();
startCall();
try {
- sessionContext.createAddress(address, multicast);
+ sessionContext.createAddress(address, multicast, autoCreated);
} finally {
endCall();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index 4e25037..919da19 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -584,8 +584,8 @@ public class ActiveMQSessionContext extends SessionContext {
}
@Override
- public void createAddress(SimpleString address, final boolean multicast) throws ActiveMQException {
- CreateAddressMessage request = new CreateAddressMessage(address, multicast, true);
+ public void createAddress(SimpleString address, final boolean multicast, final boolean autoCreated) throws ActiveMQException {
+ CreateAddressMessage request = new CreateAddressMessage(address, multicast, autoCreated, true);
sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java
index 484a2ac..10c7ff3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java
@@ -26,15 +26,19 @@ public class CreateAddressMessage extends PacketImpl {
private boolean multicast;
+ private boolean autoCreated;
+
private boolean requiresResponse;
public CreateAddressMessage(final SimpleString address,
final boolean multicast,
+ final boolean autoCreated,
final boolean requiresResponse) {
this();
this.address = address;
this.multicast = multicast;
+ this.autoCreated = autoCreated;
this.requiresResponse = requiresResponse;
}
@@ -49,6 +53,7 @@ public class CreateAddressMessage extends PacketImpl {
StringBuffer buff = new StringBuffer(getParentString());
buff.append(", address=" + address);
buff.append(", multicast=" + multicast);
+ buff.append(", autoCreated=" + autoCreated);
buff.append("]");
return buff.toString();
}
@@ -65,6 +70,10 @@ public class CreateAddressMessage extends PacketImpl {
return requiresResponse;
}
+ public boolean isAutoCreated() {
+ return autoCreated;
+ }
+
public void setAddress(SimpleString address) {
this.address = address;
}
@@ -74,6 +83,7 @@ public class CreateAddressMessage extends PacketImpl {
buffer.writeSimpleString(address);
buffer.writeBoolean(multicast);
buffer.writeBoolean(requiresResponse);
+ buffer.writeBoolean(autoCreated);
}
@Override
@@ -81,6 +91,7 @@ public class CreateAddressMessage extends PacketImpl {
address = buffer.readSimpleString();
multicast = buffer.readBoolean();
requiresResponse = buffer.readBoolean();
+ autoCreated = buffer.readBoolean();
}
@Override
@@ -89,6 +100,7 @@ public class CreateAddressMessage extends PacketImpl {
int result = super.hashCode();
result = prime * result + ((address == null) ? 0 : address.hashCode());
result = prime * result + (multicast ? 1231 : 1237);
+ result = prime * result + (autoCreated ? 1231 : 1237);
result = prime * result + (requiresResponse ? 1231 : 1237);
return result;
}
@@ -109,6 +121,8 @@ public class CreateAddressMessage extends PacketImpl {
return false;
if (multicast != other.multicast)
return false;
+ if (autoCreated != other.autoCreated)
+ return false;
if (requiresResponse != other.requiresResponse)
return false;
return true;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index 79d50c1..16e8314 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -166,7 +166,7 @@ public abstract class SessionContext {
public abstract void deleteQueue(SimpleString queueName) throws ActiveMQException;
- public abstract void createAddress(SimpleString address, boolean multicast) throws ActiveMQException;
+ public abstract void createAddress(SimpleString address, boolean multicast, boolean autoCreated) throws ActiveMQException;
public abstract void createQueue(SimpleString address,
SimpleString queueName,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
index c552d69..1270c19 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
@@ -403,19 +403,19 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
try {
ClientSession.AddressQuery query = clientSession.addressQuery(address);
- if (!query.isExists() && query.isAutoCreateJmsQueues()) {
- if (destination.isQueue() && !destination.isTemporary()) {
- clientSession.createAddress(address, false);
- clientSession.createQueue(address, address, null, true);
- } else if (destination.isQueue() && destination.isTemporary()) {
- clientSession.createAddress(address, false);
- clientSession.createTemporaryQueue(address, address);
- } else if (!destination.isQueue() && !destination.isTemporary()) {
- clientSession.createAddress(address, true);
- } else if (!destination.isQueue() && destination.isTemporary()) {
- clientSession.createAddress(address, true);
+ if (!query.isExists()) {
+ if (destination.isQueue() && query.isAutoCreateJmsQueues()) {
+ clientSession.createAddress(address, false, true);
+ if (destination.isTemporary()) {
+ // TODO is it right to use the address for the queue name here?
+ clientSession.createTemporaryQueue(address, address);
+ } else {
+ clientSession.createQueue(address, address, null, true);
+ }
+ } else if (!destination.isQueue() && query.isAutoCreateJmsTopics()) {
+ clientSession.createAddress(address, true, true);
}
- } else if (!query.isExists() && !query.isAutoCreateJmsQueues()) {
+ } else if (!query.isExists() && ((destination.isQueue() && !query.isAutoCreateJmsQueues()) || (!destination.isQueue() && !query.isAutoCreateJmsTopics()))) {
throw new InvalidDestinationException("Destination " + address + " does not exist");
} else {
connection.addKnownDestination(address);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
index d554cf8..35bbfa0 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
@@ -299,13 +299,12 @@ public class ActiveMQSession implements QueueSession, TopicSession {
if (jbd != null) {
ClientSession.AddressQuery response = session.addressQuery(jbd.getSimpleAddress());
- if (jbd.isQueue()) {
- if (!response.isExists()) {
- if (response.isAutoCreateJmsQueues()) {
- session.createAddress(jbd.getSimpleAddress(), false);
- } else {
- throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist");
- }
+ if (!response.isExists() && response.isAutoCreateJmsQueues()) {
+ if (jbd.isQueue()) {
+ session.createAddress(jbd.getSimpleAddress(), false, true);
+ session.createQueue(jbd.getSimpleAddress(), jbd.getSimpleAddress(), null, true);
+ } else {
+ session.createAddress(jbd.getSimpleAddress(), true, true);
}
if (response.getQueueNames().isEmpty()) {
@@ -660,6 +659,8 @@ public class ActiveMQSession implements QueueSession, TopicSession {
*/
if (!response.isExists() || !response.getQueueNames().contains(dest.getSimpleAddress())) {
if (response.isAutoCreateJmsQueues()) {
+ // TODO create queue here in such a way that it is deleted when consumerCount == 0
+ // perhaps just relying on the broker to do it is simplest (i.e. deleteOnNoConsumers)
session.createQueue(dest.getSimpleAddress(), dest.getSimpleAddress(), true);
} else {
throw new InvalidDestinationException("Destination " + dest.getName() + " does not exist");
@@ -674,7 +675,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
if (!response.isExists()) {
if (response.isAutoCreateJmsQueues()) {
- session.createAddress(dest.getSimpleAddress(), true);
+ session.createAddress(dest.getSimpleAddress(), true, true);
} else {
throw new InvalidDestinationException("Topic " + dest.getName() + " does not exist");
}
@@ -1106,7 +1107,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
AddressQuery query = session.addressQuery(topic.getSimpleAddress());
- if (!query.isExists() && !query.isAutoCreateJmsQueues()) {
+ if (!query.isExists() && !query.isAutoCreateJmsTopics()) {
return null;
} else {
return topic;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index 1187db0..1c87f29 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -93,7 +93,7 @@ public class MQTTSubscriptionManager {
Queue q = session.getServer().locateQueue(queue);
if (q == null) {
- q = session.getServerSession().createQueue(new SimpleString(address), queue, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, -1, false);
+ q = session.getServerSession().createQueue(new SimpleString(address), queue, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, -1, false, true);
} else {
if (q.isDeleteOnNoConsumers()) {
throw ActiveMQMessageBundle.BUNDLE.invalidQueueConfiguration(q.getAddress(), q.getName(), "deleteOnNoConsumers", false, true);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
index badcc1a..89c14e7 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
@@ -18,8 +18,6 @@ package org.apache.activemq.artemis.core.protocol.stomp;
/**
* The standard verbs and headers used for the <a href="http://stomp.codehaus.org/">STOMP</a> protocol.
- *
- * @version $Revision: 57 $
*/
public interface Stomp {
@@ -27,7 +25,7 @@ public interface Stomp {
String NEWLINE = "\n";
- public interface Commands {
+ interface Commands {
String CONNECT = "CONNECT";
@@ -53,7 +51,7 @@ public interface Stomp {
String STOMP = "STOMP";
}
- public interface Responses {
+ interface Responses {
String CONNECTED = "CONNECTED";
@@ -64,7 +62,7 @@ public interface Stomp {
String RECEIPT = "RECEIPT";
}
- public interface Headers {
+ interface Headers {
String SEPARATOR = ":";
@@ -78,15 +76,17 @@ public interface Stomp {
String CONTENT_TYPE = "content-type";
- public interface Response {
+ interface Response {
String RECEIPT_ID = "receipt-id";
}
- public interface Send {
+ interface Send {
String DESTINATION = "destination";
+ String DESTINATION_TYPE = "destination-type";
+
String CORRELATION_ID = "correlation-id";
String REPLY_TO = "reply-to";
@@ -97,10 +97,10 @@ public interface Stomp {
String TYPE = "type";
- Object PERSISTENT = "persistent";
+ String PERSISTENT = "persistent";
}
- public interface Message {
+ interface Message {
String MESSAGE_ID = "message-id";
@@ -129,7 +129,7 @@ public interface Stomp {
String VALIDATED_USER = "JMSXUserID";
}
- public interface Subscribe {
+ interface Subscribe {
String DESTINATION = "destination";
@@ -144,6 +144,8 @@ public interface Stomp {
String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name";
+ String SUBSCRIPTION_TYPE = "subscription-type";
+
String NO_LOCAL = "no-local";
public interface AckModeValues {
@@ -156,7 +158,7 @@ public interface Stomp {
}
}
- public interface Unsubscribe {
+ interface Unsubscribe {
String DESTINATION = "destination";
@@ -168,7 +170,7 @@ public interface Stomp {
String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name";
}
- public interface Connect {
+ interface Connect {
String LOGIN = "login";
@@ -182,10 +184,10 @@ public interface Stomp {
String ACCEPT_VERSION = "accept-version";
String HOST = "host";
- Object HEART_BEAT = "heart-beat";
+ String HEART_BEAT = "heart-beat";
}
- public interface Error {
+ interface Error {
String MESSAGE = "message";
@@ -193,7 +195,7 @@ public interface Stomp {
String VERSION = "version";
}
- public interface Connected {
+ interface Connected {
String SESSION = "session";
@@ -207,7 +209,7 @@ public interface Stomp {
String HEART_BEAT = "heart-beat";
}
- public interface Ack {
+ interface Ack {
String MESSAGE_ID = "message-id";
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index 74d03d1..b4ae1b5 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -245,18 +245,20 @@ public final class StompConnection implements RemotingConnection {
}
public void checkDestination(String destination) throws ActiveMQStompException {
- autoCreateDestinationIfPossible(destination);
-
if (!manager.destinationExists(destination)) {
throw BUNDLE.destinationNotExist(destination).setHandler(frameHandler);
}
}
- public void autoCreateDestinationIfPossible(String queue) throws ActiveMQStompException {
- // TODO: STOMP clients will have to prefix their destination with queue:// or topic:// so we can determine what to do here
+ public void autoCreateDestinationIfPossible(String queue, AddressInfo.RoutingType routingType) throws ActiveMQStompException {
try {
- manager.getServer().createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(queue)).setRoutingType(AddressInfo.RoutingType.ANYCAST));
- manager.getServer().createQueue(SimpleString.toSimpleString(queue), SimpleString.toSimpleString(queue), null, true, false);
+ // TODO check here to see if auto-creation is enabled
+ if (routingType == null || routingType.equals(AddressInfo.RoutingType.MULTICAST)) {
+ manager.getServer().createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(queue)).setAutoCreated(true));
+ } else {
+ manager.getServer().createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(queue)).setRoutingType(AddressInfo.RoutingType.ANYCAST));
+ manager.getServer().createQueue(SimpleString.toSimpleString(queue), SimpleString.toSimpleString(queue), null, null, true, false, true);
+ }
} catch (ActiveMQQueueExistsException e) {
// ignore
} catch (Exception e) {
@@ -616,8 +618,9 @@ public final class StompConnection implements RemotingConnection {
String ack,
String id,
String durableSubscriptionName,
- boolean noLocal) throws ActiveMQStompException {
- autoCreateDestinationIfPossible(destination);
+ boolean noLocal,
+ AddressInfo.RoutingType subscriptionType) throws ActiveMQStompException {
+ autoCreateDestinationIfPossible(destination, subscriptionType);
if (noLocal) {
String noLocalFilter = CONNECTION_ID_PROP + " <> '" + getID().toString() + "'";
if (selector == null) {
@@ -642,7 +645,7 @@ public final class StompConnection implements RemotingConnection {
}
try {
- manager.createSubscription(this, subscriptionID, durableSubscriptionName, destination, selector, ack, noLocal);
+ manager.subscribe(this, subscriptionID, durableSubscriptionName, destination, selector, ack, noLocal);
} catch (ActiveMQStompException e) {
throw e;
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 6029b37..0c1f7dd 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -368,13 +368,13 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
}
// Inner classes -------------------------------------------------
- public void createSubscription(StompConnection connection,
- String subscriptionID,
- String durableSubscriptionName,
- String destination,
- String selector,
- String ack,
- boolean noLocal) throws Exception {
+ public void subscribe(StompConnection connection,
+ String subscriptionID,
+ String durableSubscriptionName,
+ String destination,
+ String selector,
+ String ack,
+ boolean noLocal) throws Exception {
StompSession stompSession = getSession(connection);
stompSession.setNoLocal(noLocal);
if (stompSession.containsSubscription(subscriptionID)) {
@@ -411,7 +411,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
}
public boolean destinationExists(String destination) {
- return server.getPostOffice().getAddresses().contains(SimpleString.toSimpleString(destination));
+ return server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(destination)) != null;
}
public ActiveMQServer getServer() {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index 003865c..cd3103b 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10;
import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11;
import org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12;
import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.ExecutorFactory;
@@ -168,7 +169,10 @@ public abstract class VersionedStompFrameHandler {
try {
connection.validate();
String destination = frame.getHeader(Stomp.Headers.Send.DESTINATION);
- checkDestination(destination);
+ String destinationType = frame.getHeader(Headers.Send.DESTINATION_TYPE);
+ AddressInfo.RoutingType routingType = destinationType == null ? null : AddressInfo.RoutingType.valueOf(destinationType);
+ connection.autoCreateDestinationIfPossible(destination, routingType);
+ connection.checkDestination(destination);
String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
long timestamp = System.currentTimeMillis();
@@ -197,10 +201,6 @@ public abstract class VersionedStompFrameHandler {
return response;
}
- private void checkDestination(String destination) throws ActiveMQStompException {
- connection.checkDestination(destination);
- }
-
public StompFrame onBegin(StompFrame frame) {
StompFrame response = null;
String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
@@ -247,6 +247,8 @@ public abstract class VersionedStompFrameHandler {
if (durableSubscriptionName == null) {
durableSubscriptionName = request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME);
}
+ String subscriptionType = request.getHeader(Headers.Subscribe.SUBSCRIPTION_TYPE);
+ AddressInfo.RoutingType routingType = subscriptionType == null ? null : AddressInfo.RoutingType.valueOf(subscriptionType);
boolean noLocal = false;
if (request.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL)) {
@@ -254,7 +256,7 @@ public abstract class VersionedStompFrameHandler {
}
try {
- connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal);
+ connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal, routingType);
} catch (ActiveMQStompException e) {
response = e.getFrame();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 7a1bb26..c4d25ac 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -728,7 +728,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
ByteBuffer buffer = ByteBuffer.allocate(8);
buffer.putLong(queue.getID());
message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
- postOffice.route(message, null, true);
+ postOffice.route(message, true);
return "" + message.getMessageID();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 16ecdf3..b4247ae 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -1268,7 +1268,11 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
@Override
public void addAddressBinding(final long tx, final AddressInfo addressInfo) throws Exception {
- PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(), addressInfo.getRoutingType(), addressInfo.getDefaultMaxQueueConsumers());
+ PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(),
+ addressInfo.getRoutingType(),
+ addressInfo.getDefaultMaxQueueConsumers(),
+ addressInfo.isDefaultDeleteOnNoConsumers(),
+ addressInfo.isAutoCreated());
readLock();
try {
@@ -1398,7 +1402,6 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
idGenerator.loadState(record.id, buffer);
} else if (rec == JournalRecordIds.ADDRESS_BINDING_RECORD) {
PersistentAddressBindingEncoding bindingEncoding = newAddressBindingEncoding(id, buffer);
- ActiveMQServerLogger.LOGGER.info("=== Loading: " + bindingEncoding);
addressBindingInfos.add(bindingEncoding);
} else if (rec == JournalRecordIds.GROUP_RECORD) {
GroupingEncoding encoding = newGroupEncoding(id, buffer);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java
index 3821b34..e47a210 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java
@@ -31,6 +31,10 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
public int defaultMaxConsumers;
+ public boolean defaultDeleteOnNoConsumers;
+
+ public boolean autoCreated;
+
public AddressInfo.RoutingType routingType;
public PersistentAddressBindingEncoding() {
@@ -45,15 +49,23 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
routingType +
", defaultMaxConsumers=" +
defaultMaxConsumers +
+ ", defaultDeleteOnNoConsumers=" +
+ defaultDeleteOnNoConsumers +
+ ", autoCreated=" +
+ autoCreated +
"]";
}
public PersistentAddressBindingEncoding(final SimpleString name,
final AddressInfo.RoutingType routingType,
- final int defaultMaxConsumers) {
+ final int defaultMaxConsumers,
+ final boolean defaultDeleteOnNoConsumers,
+ final boolean autoCreated) {
this.name = name;
this.routingType = routingType;
this.defaultMaxConsumers = defaultMaxConsumers;
+ this.defaultDeleteOnNoConsumers = defaultDeleteOnNoConsumers;
+ this.autoCreated = autoCreated;
}
@Override
@@ -85,6 +97,8 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
name = buffer.readSimpleString();
routingType = AddressInfo.RoutingType.getType(buffer.readByte());
defaultMaxConsumers = buffer.readInt();
+ defaultDeleteOnNoConsumers = buffer.readBoolean();
+ autoCreated = buffer.readBoolean();
}
@Override
@@ -92,10 +106,12 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
buffer.writeSimpleString(name);
buffer.writeByte(routingType.getType());
buffer.writeInt(defaultMaxConsumers);
+ buffer.writeBoolean(defaultDeleteOnNoConsumers);
+ buffer.writeBoolean(autoCreated);
}
@Override
public int getEncodeSize() {
- return SimpleString.sizeofString(name) + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT;
+ return SimpleString.sizeofString(name) + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_BOOLEAN + DataConstants.SIZE_BOOLEAN;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
index 7902352..0abd708 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
@@ -24,7 +24,6 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.QueueCreator;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
@@ -77,26 +76,22 @@ public interface PostOffice extends ActiveMQComponent {
Map<SimpleString, Binding> getAllBindings();
- RoutingStatus route(ServerMessage message, QueueCreator queueCreator, boolean direct) throws Exception;
+ RoutingStatus route(ServerMessage message, boolean direct) throws Exception;
RoutingStatus route(ServerMessage message,
- QueueCreator queueCreator,
Transaction tx,
boolean direct) throws Exception;
RoutingStatus route(ServerMessage message,
- QueueCreator queueCreator,
Transaction tx,
boolean direct,
boolean rejectDuplicates) throws Exception;
RoutingStatus route(ServerMessage message,
- QueueCreator queueCreator,
RoutingContext context,
boolean direct) throws Exception;
RoutingStatus route(ServerMessage message,
- QueueCreator queueCreator,
RoutingContext context,
boolean direct,
boolean rejectDuplicates) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 1dba309..135597f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -63,7 +63,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.QueueCreator;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.RoutingContext;
@@ -441,6 +440,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override
public AddressInfo removeAddressInfo(SimpleString address) {
+ try {
+ getServer().getManagementService().unregisterAddress(address);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
return addressManager.removeAddressInfo(address);
}
@@ -595,39 +599,34 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override
public RoutingStatus route(final ServerMessage message,
- QueueCreator queueCreator,
final boolean direct) throws Exception {
- return route(message, queueCreator, (Transaction) null, direct);
+ return route(message, (Transaction) null, direct);
}
@Override
public RoutingStatus route(final ServerMessage message,
- QueueCreator queueCreator,
final Transaction tx,
final boolean direct) throws Exception {
- return route(message, queueCreator, new RoutingContextImpl(tx), direct);
+ return route(message, new RoutingContextImpl(tx), direct);
}
@Override
public RoutingStatus route(final ServerMessage message,
- final QueueCreator queueCreator,
final Transaction tx,
final boolean direct,
final boolean rejectDuplicates) throws Exception {
- return route(message, queueCreator, new RoutingContextImpl(tx), direct, rejectDuplicates);
+ return route(message, new RoutingContextImpl(tx), direct, rejectDuplicates);
}
@Override
public RoutingStatus route(final ServerMessage message,
- final QueueCreator queueCreator,
final RoutingContext context,
final boolean direct) throws Exception {
- return route(message, queueCreator, context, direct, true);
+ return route(message, context, direct, true);
}
@Override
public RoutingStatus route(final ServerMessage message,
- final QueueCreator queueCreator,
final RoutingContext context,
final boolean direct,
boolean rejectDuplicates) throws Exception {
@@ -657,14 +656,15 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
+ // TODO auto-create queues here?
// first check for the auto-queue creation thing
- if (bindings == null && queueCreator != null) {
+ if (bindings == null) {
// There is no queue with this address, we will check if it needs to be created
- if (queueCreator.create(address)) {
+// if (queueCreator.create(address)) {
// TODO: this is not working!!!!
// reassign bindings if it was created
- bindings = addressManager.getBindingsForRoutingAddress(address);
- }
+// bindings = addressManager.getBindingsForRoutingAddress(address);
+// }
}
if (bindings != null) {
@@ -704,7 +704,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
message.setAddress(dlaAddress);
- route(message, null, context.getTransaction(), false);
+ route(message, context.getTransaction(), false);
result = RoutingStatus.NO_BINDINGS_DLA;
}
} else {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index 09f679b..89af2a1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -233,30 +233,6 @@ public interface ActiveMQServer extends ActiveMQComponent {
long getUptimeMillis();
/**
- * This is the queue creator responsible for automatic JMS Queue creations.
- *
- * @param queueCreator
- */
- void setJMSQueueCreator(QueueCreator queueCreator);
-
- /**
- * @see org.apache.activemq.artemis.core.server.ActiveMQServer#setJMSQueueCreator(QueueCreator)
- */
- QueueCreator getJMSDestinationCreator();
-
- /**
- * This is the queue deleter responsible for automatic JMS Queue deletions.
- *
- * @param queueDeleter
- */
- void setJMSQueueDeleter(QueueDeleter queueDeleter);
-
- /**
- * @see org.apache.activemq.artemis.core.server.ActiveMQServer#setJMSQueueDeleter(QueueDeleter)
- */
- QueueDeleter getJMSQueueDeleter();
-
- /**
* Returns whether the initial replication synchronization process with the backup server is complete; applicable for
* either the live or backup server.
*/
@@ -361,7 +337,7 @@ public interface ActiveMQServer extends ActiveMQComponent {
QueueQueryResult queueQuery(SimpleString name) throws Exception;
Queue deployQueue(SimpleString address,
- SimpleString resourceName,
+ SimpleString queueName,
SimpleString filterString,
boolean durable,
boolean temporary,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueCreator.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueCreator.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueCreator.java
deleted file mode 100644
index f89a2b0..0000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueCreator.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.artemis.core.server;
-
-import org.apache.activemq.artemis.api.core.SimpleString;
-
-public interface QueueCreator {
-
- /**
- * You should return true if you even tried to create the queue and the queue was already there.
- * As the callers of this method will use that as an indicator that they should re-route the messages.
- * *
- *
- * @return True if a queue was created.
- */
- boolean create(SimpleString address) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueDeleter.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueDeleter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueDeleter.java
deleted file mode 100644
index d062848..0000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueDeleter.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.artemis.core.server;
-
-import org.apache.activemq.artemis.api.core.SimpleString;
-
-public interface QueueDeleter {
-
- /**
- * @return True if a queue was deleted.
- */
- boolean delete(SimpleString queueName) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java
index 64e7a5d..2557b73 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java
@@ -29,7 +29,7 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
*/
public interface QueueFactory {
- Queue createQueueWith(final QueueConfig config);
+ Queue createQueueWith(final QueueConfig config) throws Exception;
/**
* @deprecated Replaced by {@link #createQueueWith}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index 910eb22..28d283d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -87,8 +87,6 @@ public interface ServerSession extends SecurityAuth {
void markTXFailed(Throwable e);
- QueueCreator getQueueCreator();
-
List<Xid> xaGetInDoubtXids();
int xaGetTimeout();
@@ -194,7 +192,8 @@ public interface ServerSession extends SecurityAuth {
boolean temporary,
boolean durable,
Integer maxConsumers,
- Boolean deleteOnNoConsumers) throws Exception;
+ Boolean deleteOnNoConsumers,
+ final Boolean autoCreated) throws Exception;
void createSharedQueue(SimpleString address,
SimpleString name,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index df00cc1..d119891 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -117,8 +117,6 @@ import org.apache.activemq.artemis.core.server.PostQueueCreationCallback;
import org.apache.activemq.artemis.core.server.PostQueueDeletionCallback;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
-import org.apache.activemq.artemis.core.server.QueueCreator;
-import org.apache.activemq.artemis.core.server.QueueDeleter;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
@@ -269,16 +267,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private FileStoreMonitor fileStoreMonitor;
- /**
- * This will be set by the JMS Queue Manager.
- */
- private QueueCreator jmsQueueCreator;
-
- /**
- * This will be set by the JMS Queue Manager.
- */
- private QueueDeleter jmsQueueDeleter;
-
private final Map<String, ServerSession> sessions = new ConcurrentHashMap<>();
private final Semaphore activationLock = new Semaphore(1);
@@ -726,26 +714,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
@Override
- public QueueCreator getJMSDestinationCreator() {
- return jmsQueueCreator;
- }
-
- @Override
- public void setJMSQueueCreator(QueueCreator jmsQueueCreator) {
- this.jmsQueueCreator = jmsQueueCreator;
- }
-
- @Override
- public QueueDeleter getJMSQueueDeleter() {
- return jmsQueueDeleter;
- }
-
- @Override
- public void setJMSQueueDeleter(QueueDeleter jmsQueueDeleter) {
- this.jmsQueueDeleter = jmsQueueDeleter;
- }
-
- @Override
public boolean isReplicaSync() {
if (activation instanceof SharedNothingLiveActivation) {
ReplicationManager replicationManager = getReplicationManager();
@@ -1288,7 +1256,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
SessionCallback callback,
OperationContext context,
boolean autoCreateJMSQueues) throws Exception {
- return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), callback, context, autoCreateJMSQueues ? jmsQueueCreator : null, pagingManager);
+ return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), callback, context, pagingManager);
}
@Override
@@ -1546,17 +1514,17 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override
public Queue deployQueue(final SimpleString address,
- final SimpleString resourceName,
+ final SimpleString queueName,
final SimpleString filterString,
final boolean durable,
final boolean temporary,
final boolean autoCreated) throws Exception {
- return deployQueue(address, resourceName, filterString, durable, temporary, autoCreated, null, null);
+ return deployQueue(address, queueName, filterString, durable, temporary, autoCreated, null, null);
}
@Override
public Queue deployQueue(final SimpleString address,
- final SimpleString resourceName,
+ final SimpleString queueName,
final SimpleString filterString,
final boolean durable,
final boolean temporary,
@@ -1565,9 +1533,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final Boolean deleteOnNoConsumers) throws Exception {
// TODO: fix logging here as this could be for a topic or queue
- ActiveMQServerLogger.LOGGER.deployQueue(resourceName);
+ ActiveMQServerLogger.LOGGER.deployQueue(queueName);
- return createQueue(address, resourceName, filterString, null, durable, temporary, true, false, autoCreated, maxConsumers, deleteOnNoConsumers);
+ return createQueue(address, queueName, filterString, null, durable, temporary, true, false, autoCreated, maxConsumers, deleteOnNoConsumers);
}
@Override
@@ -2068,6 +2036,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
// Deploy any predefined queues
deployQueuesFromConfiguration();
+ registerPostQueueDeletionCallback(new PostQueueDeletionCallback() {
+ // TODO delete auto-created addresses when queueCount == 0
+ @Override
+ public void callback(SimpleString address, SimpleString queueName) throws Exception {
+ if (getAddressInfo(address).isAutoCreated() && postOffice.getBindingsForAddress(address).getBindings().size() == 0) {
+ removeAddressInfo(address);
+ }
+ }
+ });
+
// We need to call this here, this gives any dependent server a chance to deploy its own addresses
// this needs to be done before clustering is fully activated
callActivateCallbacks();
@@ -2340,7 +2318,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean autoCreated,
final Integer maxConsumers,
final Boolean deleteOnNoConsumers) throws Exception {
-
final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName);
if (binding != null) {
if (ignoreIfExists) {
@@ -2397,7 +2374,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (transientQueue) {
queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName()));
} else if (queue.isAutoCreated()) {
- queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this.getJMSQueueDeleter(), queue.getName()));
+ queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this, queue.getName()));
}
final QueueBinding localQueueBinding = new LocalQueueBinding(getAddressInfo(queue.getAddress()), queue, nodeManager.getNodeId());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
index 708aeda..6ad40fa 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
@@ -29,6 +29,8 @@ public class AddressInfo {
private int defaultMaxQueueConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers();
+ private boolean autoCreated = false;
+
public AddressInfo(SimpleString name) {
this.name = name;
}
@@ -67,6 +69,15 @@ public class AddressInfo {
return this;
}
+ public boolean isAutoCreated() {
+ return autoCreated;
+ }
+
+ public AddressInfo setAutoCreated(boolean autoCreated) {
+ this.autoCreated = autoCreated;
+ return this;
+ }
+
public SimpleString getName() {
return name;
}
@@ -78,6 +89,7 @@ public class AddressInfo {
buff.append(", routingType=" + routingType);
buff.append(", defaultMaxQueueConsumers=" + defaultMaxQueueConsumers);
buff.append(", defaultDeleteOnNoConsumers=" + defaultDeleteOnNoConsumers);
+ buff.append(", autoCreated=" + autoCreated);
buff.append("]");
return buff.toString();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java
index 535e53b..55fb765 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java
@@ -17,34 +17,48 @@
package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.AutoCreatedQueueManager;
-import org.apache.activemq.artemis.core.server.QueueDeleter;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.ReferenceCounterUtil;
public class AutoCreatedQueueManagerImpl implements AutoCreatedQueueManager {
private final SimpleString queueName;
- private final QueueDeleter deleter;
+ private final ActiveMQServer server;
private final Runnable runnable = new Runnable() {
@Override
public void run() {
- try {
- if (deleter != null) {
- deleter.delete(queueName);
+ // TODO check auto created and deleteOnNoConsumers
+ Queue queue = server.locateQueue(queueName);
+ SimpleString address = queue.getAddress();
+ AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
+ long consumerCount = queue.getConsumerCount();
+ long messageCount = queue.getMessageCount();
+
+ if (queue.getMessageCount() == 0) {
+ if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
+ ActiveMQServerLogger.LOGGER.debug("deleting auto-created queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; getAutoDeleteJmsQueues = " + settings.getAutoDeleteJmsQueues());
+ }
+
+ // TODO handle this exception better
+ try {
+ server.destroyQueue(queueName, null, true, false);
+ } catch (Exception e) {
+ e.printStackTrace();
}
- } catch (Exception e) {
- ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e, queueName);
}
}
};
private final ReferenceCounterUtil referenceCounterUtil = new ReferenceCounterUtil(runnable);
- public AutoCreatedQueueManagerImpl(QueueDeleter deleter, SimpleString queueName) {
- this.deleter = deleter;
+ public AutoCreatedQueueManagerImpl(ActiveMQServer server, SimpleString queueName) {
+ this.server = server;
this.queueName = queueName;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
index e583fc0..5782379 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
@@ -104,7 +104,7 @@ public class DivertImpl implements Divert {
copy = message;
}
- postOffice.route(copy, null, context.getTransaction(), false);
+ postOffice.route(copy, context.getTransaction(), false);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index 76fc69b..eb31737 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -151,9 +151,7 @@ public class PostOfficeJournalLoader implements JournalLoader {
.deleteOnNoConsumers(queueBindingInfo.isDeleteOnNoConsumers())
.maxConsumers(queueBindingInfo.getMaxConsumers());
final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build());
- if (queue.isAutoCreated()) {
- queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl) postOffice).getServer().getJMSQueueDeleter(), queueBindingInfo.getQueueName()));
- }
+ queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl)postOffice).getServer(), queueBindingInfo.getQueueName()));
if (queueBindingInfo.getQueueStatusEncodings() != null) {
for (QueueStatusEncoding encoding : queueBindingInfo.getQueueStatusEncodings()) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index c391b90..7c614ae 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2326,7 +2326,7 @@ public class QueueImpl implements Queue {
copyMessage.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
}
- postOffice.route(copyMessage, null, tx, false, rejectDuplicate);
+ postOffice.route(copyMessage, tx, false, rejectDuplicate);
acknowledge(tx, ref);
}
@@ -2530,7 +2530,7 @@ public class QueueImpl implements Queue {
copyMessage.setAddress(address);
- postOffice.route(copyMessage, null, tx, false, rejectDuplicate);
+ postOffice.route(copyMessage, tx, false, rejectDuplicate);
acknowledge(tx, ref, reason);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 3eb5fcf..80fa7b2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -68,7 +68,6 @@ import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.QueueCreator;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ServerConsumer;
@@ -166,8 +165,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
private final OperationContext context;
- private QueueCreator queueCreator;
-
// Session's usage should be by definition single threaded, hence it's not needed to use a concurrentHashMap here
protected final Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos = new HashMap<>();
@@ -202,7 +199,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final SimpleString defaultAddress,
final SessionCallback callback,
final OperationContext context,
- final QueueCreator queueCreator,
final PagingManager pagingManager) throws Exception {
this.username = username;
@@ -250,8 +246,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
remotingConnection.addFailureListener(this);
this.context = context;
- this.queueCreator = queueCreator;
-
if (!xa) {
tx = newTransaction();
}
@@ -389,11 +383,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
}
- @Override
- public QueueCreator getQueueCreator() {
- return queueCreator;
- }
-
protected void securityCheck(SimpleString address, CheckType checkType, SecurityAuth auth) throws Exception {
if (securityEnabled) {
securityStore.check(address, checkType, auth);
@@ -499,7 +488,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final SimpleString filterString,
final boolean temporary,
final boolean durable) throws Exception {
- return createQueue(address, name, filterString, temporary, durable, null, null);
+ return createQueue(address, name, filterString, temporary, durable, null, null, false);
}
@Override
@@ -509,7 +498,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final boolean temporary,
final boolean durable,
final Integer maxConsumers,
- final Boolean deleteOnNoConsumers) throws Exception {
+ final Boolean deleteOnNoConsumers,
+ final Boolean autoCreated) throws Exception {
if (durable) {
// make sure the user has privileges to create this queue
securityCheck(address, CheckType.CREATE_DURABLE_QUEUE, this);
@@ -519,7 +509,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
server.checkQueueCreationLimit(getUsername());
- Queue queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, maxConsumers, deleteOnNoConsumers);
+ Queue queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers);
if (temporary) {
// Temporary queue in core simply means the queue will be deleted if
@@ -1472,7 +1462,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
private void installJMSHooks() {
- this.queueCreator = server.getJMSDestinationCreator();
}
private Map<SimpleString, Pair<UUID, AtomicLong>> cloneTargetAddresses() {
@@ -1592,11 +1581,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
try {
- if (noAutoCreateQueue) {
- result = postOffice.route(msg, null, routingContext, direct);
- } else {
- result = postOffice.route(msg, queueCreator, routingContext, direct);
- }
+ result = postOffice.route(msg, routingContext, direct);
Pair<UUID, AtomicLong> value = targetAddressInfos.get(msg.getAddress());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index 636aa82..349d36a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -464,7 +464,6 @@ public class ManagementServiceImpl implements ManagementService {
public synchronized void registerInRegistry(final String resourceName, final Object managedResource) {
unregisterFromRegistry(resourceName);
- ActiveMQServerLogger.LOGGER.info("Registering: " + resourceName);
registry.put(resourceName, managedResource);
}
@@ -653,7 +652,7 @@ public class ManagementServiceImpl implements ManagementService {
notificationMessage.putStringProperty(new SimpleString("foobar"), new SimpleString(notification.getUID()));
}
- postOffice.route(notificationMessage, null, false);
+ postOffice.route(notificationMessage, false);
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index eb97b17..59f2cdf 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -52,6 +52,9 @@ import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.junit.Assert;
import org.junit.Test;
+import static org.apache.activemq.artemis.core.config.CoreAddressConfiguration.RoutingType.ANYCAST;
+import static org.apache.activemq.artemis.core.config.CoreAddressConfiguration.RoutingType.MULTICAST;
+
public class FileConfigurationTest extends ConfigurationImplTest {
private final String fullConfigurationName = "ConfigurationTest-full-config.xml";
@@ -419,6 +422,62 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals("addr2", queueConfiguration.getAddress());
}
+ private void verifyAddresses() {
+ assertEquals(2, conf.getAddressConfigurations().size());
+
+ // Addr 1
+ CoreAddressConfiguration addressConfiguration = conf.getAddressConfigurations().get(0);
+ assertEquals("addr1", addressConfiguration.getName());
+ assertEquals(ANYCAST, addressConfiguration.getRoutingType());
+ assertEquals(2, addressConfiguration.getQueueConfigurations().size());
+
+ // Addr 1 Queue 1
+ CoreQueueConfiguration queueConfiguration = addressConfiguration.getQueueConfigurations().get(0);
+
+ assertEquals("q1", queueConfiguration.getName());
+ assertFalse(queueConfiguration.isDurable());
+ assertEquals("color='blue'", queueConfiguration.getFilterString());
+ assertEquals(addressConfiguration.getDefaultDeleteOnNoConsumers(), queueConfiguration.getDeleteOnNoConsumers());
+ assertEquals("addr1", queueConfiguration.getAddress());
+ assertEquals(addressConfiguration.getDefaultMaxConsumers(), queueConfiguration.getMaxConsumers());
+
+ // Addr 1 Queue 2
+ queueConfiguration = addressConfiguration.getQueueConfigurations().get(1);
+
+ assertEquals("q2", queueConfiguration.getName());
+ assertTrue(queueConfiguration.isDurable());
+ assertEquals("color='green'", queueConfiguration.getFilterString());
+ assertEquals(new Integer(-1), queueConfiguration.getMaxConsumers());
+ assertFalse(queueConfiguration.getDeleteOnNoConsumers());
+ assertEquals("addr1", queueConfiguration.getAddress());
+
+ // Addr 2
+ addressConfiguration = conf.getAddressConfigurations().get(1);
+ assertEquals("addr2", addressConfiguration.getName());
+ assertEquals(MULTICAST, addressConfiguration.getRoutingType());
+ assertEquals(2, addressConfiguration.getQueueConfigurations().size());
+
+ // Addr 2 Queue 1
+ queueConfiguration = addressConfiguration.getQueueConfigurations().get(0);
+
+ assertEquals("q3", queueConfiguration.getName());
+ assertTrue(queueConfiguration.isDurable());
+ assertEquals("color='red'", queueConfiguration.getFilterString());
+ assertEquals(new Integer(10), queueConfiguration.getMaxConsumers());
+ assertEquals(addressConfiguration.getDefaultDeleteOnNoConsumers(), queueConfiguration.getDeleteOnNoConsumers());
+ assertEquals("addr2", queueConfiguration.getAddress());
+
+ // Addr 2 Queue 2
+ queueConfiguration = addressConfiguration.getQueueConfigurations().get(1);
+
+ assertEquals("q4", queueConfiguration.getName());
+ assertTrue(queueConfiguration.isDurable());
+ assertNull(queueConfiguration.getFilterString());
+ assertEquals(addressConfiguration.getDefaultMaxConsumers(), queueConfiguration.getMaxConsumers());
+ assertTrue(queueConfiguration.getDeleteOnNoConsumers());
+ assertEquals("addr2", queueConfiguration.getAddress());
+ }
+
@Test
public void testSecuritySettingPlugin() throws Exception {
FileConfiguration fc = new FileConfiguration();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/IncomingVertxEventHandler.java
----------------------------------------------------------------------
diff --git a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/IncomingVertxEventHandler.java b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/IncomingVertxEventHandler.java
index a5a5015..4d89e6d 100644
--- a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/IncomingVertxEventHandler.java
+++ b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/IncomingVertxEventHandler.java
@@ -154,7 +154,7 @@ class IncomingVertxEventHandler implements ConnectorService {
manualEncodeVertxMessageBody(msg.getBodyBuffer(), message.body(), type);
try {
- postOffice.route(msg, null, false);
+ postOffice.route(msg, false);
} catch (Exception e) {
ActiveMQVertxLogger.LOGGER.error("failed to route msg " + msg, e);
}