You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/12/09 19:49:04 UTC
[20/50] [abbrv] activemq-artemis git commit: ARTEMIS-788 Stomp
refactor + track autocreation for addresses
ARTEMIS-788 Stomp refactor + track autocreation for addresses
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a88853fe
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a88853fe
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a88853fe
Branch: refs/heads/master
Commit: a88853fe5390ef254ad3f3be51c55f33dfaf9ced
Parents: 0189f15
Author: jbertram <jb...@apache.org>
Authored: Tue Oct 18 19:45:02 2016 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Fri Dec 9 18:43:15 2016 +0000
----------------------------------------------------------------------
.../apache/activemq/cli/test/ArtemisTest.java | 4 +-
.../artemis/api/core/client/ClientSession.java | 2 +-
.../core/client/impl/ClientSessionImpl.java | 4 +-
.../core/impl/ActiveMQSessionContext.java | 4 +-
.../impl/wireformat/CreateAddressMessage.java | 14 +
.../remoting/impl/netty/TransportConstants.java | 10 +
.../spi/core/remoting/SessionContext.java | 2 +-
.../jms/client/ActiveMQMessageProducer.java | 26 +-
.../artemis/jms/client/ActiveMQSession.java | 41 +-
.../protocol/mqtt/MQTTSubscriptionManager.java | 2 +-
.../protocol/stomp/ActiveMQStompException.java | 4 +-
.../ActiveMQStompProtocolMessageBundle.java | 7 +-
.../artemis/core/protocol/stomp/Stomp.java | 34 +-
.../core/protocol/stomp/StompConnection.java | 63 +-
.../protocol/stomp/StompProtocolManager.java | 16 +-
.../stomp/VersionedStompFrameHandler.java | 43 +-
.../management/impl/AddressControlImpl.java | 2 +-
.../core/management/impl/QueueControlImpl.java | 2 +-
.../journal/AbstractJournalStorageManager.java | 7 +-
.../codec/PersistentAddressBindingEncoding.java | 20 +-
.../artemis/core/postoffice/PostOffice.java | 7 +-
.../core/postoffice/impl/PostOfficeImpl.java | 30 +-
.../artemis/core/server/ActiveMQServer.java | 26 +-
.../artemis/core/server/QueueCreator.java | 32 -
.../artemis/core/server/QueueDeleter.java | 28 -
.../artemis/core/server/QueueFactory.java | 2 +-
.../artemis/core/server/ServerSession.java | 5 +-
.../core/server/impl/ActiveMQServerImpl.java | 63 +-
.../artemis/core/server/impl/AddressInfo.java | 12 +
.../impl/AutoCreatedQueueManagerImpl.java | 32 +-
.../artemis/core/server/impl/DivertImpl.java | 2 +-
.../server/impl/PostOfficeJournalLoader.java | 4 +-
.../artemis/core/server/impl/QueueImpl.java | 4 +-
.../core/server/impl/ServerSessionImpl.java | 25 +-
.../management/impl/ManagementServiceImpl.java | 3 +-
.../core/config/impl/FileConfigurationTest.java | 8 +-
.../vertx/IncomingVertxEventHandler.java | 2 +-
.../tests/extras/jms/bridge/BridgeTestBase.java | 5 +-
.../tests/integration/amqp/ProtonTest.java | 4 +-
.../client/AutoCreateJmsDestinationTest.java | 9 +-
.../integration/client/HangConsumerTest.java | 4 +-
.../tests/integration/client/SessionTest.java | 2 +
.../jms/cluster/AutoCreateQueueClusterTest.java | 6 +-
.../jms/jms2client/NonExistentQueueTest.java | 16 +-
.../persistence/XmlImportExportTest.java | 161 --
.../integration/stomp/ConcurrentStompTest.java | 136 --
.../tests/integration/stomp/ExtraStompTest.java | 848 ---------
.../stomp/StompConnectionCleanupTest.java | 52 +-
.../integration/stomp/StompOverHttpTest.java | 78 -
.../stomp/StompOverWebsocketTest.java | 151 --
.../tests/integration/stomp/StompTest.java | 1674 ++++++----------
.../tests/integration/stomp/StompTestBase.java | 550 +++---
.../stomp/StompTestWithInterceptors.java | 159 ++
.../stomp/StompTestWithLargeMessages.java | 416 ++++
.../stomp/StompTestWithMessageID.java | 78 +
.../stomp/StompTestWithSecurity.java | 28 +-
.../stomp/util/AbstractClientStompFrame.java | 77 +-
.../util/AbstractStompClientConnection.java | 100 +-
.../stomp/util/ClientStompFrame.java | 10 +-
.../stomp/util/ClientStompFrameV10.java | 10 +-
.../stomp/util/ClientStompFrameV11.java | 22 +-
.../stomp/util/ClientStompFrameV12.java | 38 +-
.../stomp/util/StompClientConnection.java | 5 +-
.../stomp/util/StompClientConnectionV10.java | 43 +-
.../stomp/util/StompClientConnectionV11.java | 104 +-
.../stomp/util/StompClientConnectionV12.java | 79 +-
.../stomp/util/StompFrameFactory.java | 2 +
.../stomp/util/StompFrameFactoryV10.java | 11 +-
.../stomp/util/StompFrameFactoryV11.java | 28 +-
.../stomp/util/StompFrameFactoryV12.java | 38 +-
.../integration/stomp/v11/ExtraStompTest.java | 341 +---
.../integration/stomp/v11/StompV11Test.java | 1800 +++++++-----------
.../integration/stomp/v11/StompV11TestBase.java | 167 --
.../integration/stomp/v12/StompV12Test.java | 1775 +++++++----------
.../tests/util/JMSClusteredTestBase.java | 23 +-
.../artemis/jms/tests/MessageProducerTest.java | 1 +
.../activemq/artemis/jms/tests/SessionTest.java | 2 +
.../jms/tests/message/MessageHeaderTest.java | 2 +-
.../core/server/impl/fakes/FakePostOffice.java | 7 +-
79 files changed, 3610 insertions(+), 6044 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
index ba02bd3..eb3d48a 100644
--- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
+++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java
@@ -550,11 +550,11 @@ public class ArtemisTest {
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession coreSession = factory.createSession("admin", "admin", false, true, true, false, 0)) {
for (String str : queues.split(",")) {
- ClientSession.QueueQuery queryResult = coreSession.queueQuery(SimpleString.toSimpleString("jms.queue." + str));
+ ClientSession.QueueQuery queryResult = coreSession.queueQuery(SimpleString.toSimpleString(str));
assertTrue("Couldn't find queue " + str, queryResult.isExists());
}
for (String str : topics.split(",")) {
- ClientSession.QueueQuery queryResult = coreSession.queueQuery(SimpleString.toSimpleString("jms.topic." + str));
+ ClientSession.QueueQuery queryResult = coreSession.queueQuery(SimpleString.toSimpleString(str));
assertTrue("Couldn't find topic " + str, queryResult.isExists());
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
index fbd33d3..35bc9f9 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
@@ -198,7 +198,7 @@ public interface ClientSession extends XAResource, AutoCloseable {
*/
int getVersion();
- void createAddress(final SimpleString address, final boolean multicast) throws ActiveMQException;
+ void createAddress(final SimpleString address, final boolean multicast, final boolean autoCreated) throws ActiveMQException;
// Queue Operations ----------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index 2739109..16311b0 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -279,12 +279,12 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
}
@Override
- public void createAddress(final SimpleString address, final boolean multicast) throws ActiveMQException {
+ public void createAddress(final SimpleString address, final boolean multicast, boolean autoCreated) throws ActiveMQException {
checkClosed();
startCall();
try {
- sessionContext.createAddress(address, multicast);
+ sessionContext.createAddress(address, multicast, autoCreated);
} finally {
endCall();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index 4e25037..919da19 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -584,8 +584,8 @@ public class ActiveMQSessionContext extends SessionContext {
}
@Override
- public void createAddress(SimpleString address, final boolean multicast) throws ActiveMQException {
- CreateAddressMessage request = new CreateAddressMessage(address, multicast, true);
+ public void createAddress(SimpleString address, final boolean multicast, final boolean autoCreated) throws ActiveMQException {
+ CreateAddressMessage request = new CreateAddressMessage(address, multicast, autoCreated, true);
sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java
index 484a2ac..10c7ff3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java
@@ -26,15 +26,19 @@ public class CreateAddressMessage extends PacketImpl {
private boolean multicast;
+ private boolean autoCreated;
+
private boolean requiresResponse;
public CreateAddressMessage(final SimpleString address,
final boolean multicast,
+ final boolean autoCreated,
final boolean requiresResponse) {
this();
this.address = address;
this.multicast = multicast;
+ this.autoCreated = autoCreated;
this.requiresResponse = requiresResponse;
}
@@ -49,6 +53,7 @@ public class CreateAddressMessage extends PacketImpl {
StringBuffer buff = new StringBuffer(getParentString());
buff.append(", address=" + address);
buff.append(", multicast=" + multicast);
+ buff.append(", autoCreated=" + autoCreated);
buff.append("]");
return buff.toString();
}
@@ -65,6 +70,10 @@ public class CreateAddressMessage extends PacketImpl {
return requiresResponse;
}
+ public boolean isAutoCreated() {
+ return autoCreated;
+ }
+
public void setAddress(SimpleString address) {
this.address = address;
}
@@ -74,6 +83,7 @@ public class CreateAddressMessage extends PacketImpl {
buffer.writeSimpleString(address);
buffer.writeBoolean(multicast);
buffer.writeBoolean(requiresResponse);
+ buffer.writeBoolean(autoCreated);
}
@Override
@@ -81,6 +91,7 @@ public class CreateAddressMessage extends PacketImpl {
address = buffer.readSimpleString();
multicast = buffer.readBoolean();
requiresResponse = buffer.readBoolean();
+ autoCreated = buffer.readBoolean();
}
@Override
@@ -89,6 +100,7 @@ public class CreateAddressMessage extends PacketImpl {
int result = super.hashCode();
result = prime * result + ((address == null) ? 0 : address.hashCode());
result = prime * result + (multicast ? 1231 : 1237);
+ result = prime * result + (autoCreated ? 1231 : 1237);
result = prime * result + (requiresResponse ? 1231 : 1237);
return result;
}
@@ -109,6 +121,8 @@ public class CreateAddressMessage extends PacketImpl {
return false;
if (multicast != other.multicast)
return false;
+ if (autoCreated != other.autoCreated)
+ return false;
if (requiresResponse != other.requiresResponse)
return false;
return true;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
index 14efb79..a8e613e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
@@ -203,6 +203,14 @@ public class TransportConstants {
public static final String STOMP_MIN_LARGE_MESSAGE_SIZE = "stomp-min-large-message-size";
+ public static final String STOMP_ANYCAST_PREFIX = "stompAnycastPrefix";
+
+ public static final String DEFAULT_STOMP_ANYCAST_PREFIX = "";
+
+ public static final String STOMP_MULTICAST_PREFIX = "stompMulticastPrefix";
+
+ public static final String DEFAULT_STOMP_MULTICAST_PREFIX = "";
+
public static final String NETTY_CONNECT_TIMEOUT = "connect-timeout-millis";
public static final int DEFAULT_NETTY_CONNECT_TIMEOUT = -1;
@@ -242,6 +250,8 @@ public class TransportConstants {
allowableAcceptorKeys.add(TransportConstants.CLUSTER_CONNECTION);
allowableAcceptorKeys.add(TransportConstants.STOMP_CONSUMERS_CREDIT);
allowableAcceptorKeys.add(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE);
+ allowableAcceptorKeys.add(TransportConstants.STOMP_ANYCAST_PREFIX);
+ allowableAcceptorKeys.add(TransportConstants.STOMP_MULTICAST_PREFIX);
allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL);
allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL_MAX);
allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL_MIN);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index 79d50c1..16e8314 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -166,7 +166,7 @@ public abstract class SessionContext {
public abstract void deleteQueue(SimpleString queueName) throws ActiveMQException;
- public abstract void createAddress(SimpleString address, boolean multicast) throws ActiveMQException;
+ public abstract void createAddress(SimpleString address, boolean multicast, boolean autoCreated) throws ActiveMQException;
public abstract void createQueue(SimpleString address,
SimpleString queueName,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
index c552d69..5cbd40f 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
@@ -403,20 +403,20 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
try {
ClientSession.AddressQuery query = clientSession.addressQuery(address);
- if (!query.isExists() && query.isAutoCreateJmsQueues()) {
- if (destination.isQueue() && !destination.isTemporary()) {
- clientSession.createAddress(address, false);
- clientSession.createQueue(address, address, null, true);
- } else if (destination.isQueue() && destination.isTemporary()) {
- clientSession.createAddress(address, false);
- clientSession.createTemporaryQueue(address, address);
- } else if (!destination.isQueue() && !destination.isTemporary()) {
- clientSession.createAddress(address, true);
- } else if (!destination.isQueue() && destination.isTemporary()) {
- clientSession.createAddress(address, true);
+ if (!query.isExists()) {
+ if (destination.isQueue() && query.isAutoCreateJmsQueues()) {
+ clientSession.createAddress(address, false, true);
+ if (destination.isTemporary()) {
+ // TODO is it right to use the address for the queue name here?
+ clientSession.createTemporaryQueue(address, address);
+ } else {
+ clientSession.createQueue(address, address, null, true);
+ }
+ } else if (!destination.isQueue() && query.isAutoCreateJmsTopics()) {
+ clientSession.createAddress(address, true, true);
+ } else if ((destination.isQueue() && !query.isAutoCreateJmsQueues()) || (!destination.isQueue() && !query.isAutoCreateJmsTopics())) {
+ throw new InvalidDestinationException("Destination " + address + " does not exist");
}
- } else if (!query.isExists() && !query.isAutoCreateJmsQueues()) {
- throw new InvalidDestinationException("Destination " + address + " does not exist");
} else {
connection.addKnownDestination(address);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
index d554cf8..f514dba 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
@@ -299,29 +299,16 @@ public class ActiveMQSession implements QueueSession, TopicSession {
if (jbd != null) {
ClientSession.AddressQuery response = session.addressQuery(jbd.getSimpleAddress());
- if (jbd.isQueue()) {
- if (!response.isExists()) {
- if (response.isAutoCreateJmsQueues()) {
- session.createAddress(jbd.getSimpleAddress(), false);
- } else {
- throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist");
- }
- }
-
- if (response.getQueueNames().isEmpty()) {
- if (response.isAutoCreateJmsQueues()) {
- session.createQueue(jbd.getSimpleAddress(), jbd.getSimpleAddress(), null, true);
- } else {
- throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist");
- }
- }
- } else {
- if (!response.isExists()) {
- if (response.isAutoCreateJmsTopics()) {
- session.createAddress(jbd.getSimpleAddress(), true);
- } else {
- throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist");
- }
+ if (!response.isExists()) {
+ if (jbd.isQueue() && response.isAutoCreateJmsQueues()) {
+ // TODO create queue here in such a way that it is deleted when consumerCount == 0
+ // perhaps just relying on the broker to do it is simplest (i.e. deleteOnNoConsumers)
+ session.createAddress(jbd.getSimpleAddress(), false, true);
+ session.createQueue(jbd.getSimpleAddress(), jbd.getSimpleAddress(), null, true);
+ } else if (!jbd.isQueue() && response.isAutoCreateJmsTopics()) {
+ session.createAddress(jbd.getSimpleAddress(), true, true);
+ } else {
+ throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist");
}
}
}
@@ -660,6 +647,8 @@ public class ActiveMQSession implements QueueSession, TopicSession {
*/
if (!response.isExists() || !response.getQueueNames().contains(dest.getSimpleAddress())) {
if (response.isAutoCreateJmsQueues()) {
+ // TODO create queue here in such a way that it is deleted when consumerCount == 0
+ // perhaps just relying on the broker to do it is simplest (i.e. deleteOnNoConsumers)
session.createQueue(dest.getSimpleAddress(), dest.getSimpleAddress(), true);
} else {
throw new InvalidDestinationException("Destination " + dest.getName() + " does not exist");
@@ -673,8 +662,8 @@ public class ActiveMQSession implements QueueSession, TopicSession {
AddressQuery response = session.addressQuery(dest.getSimpleAddress());
if (!response.isExists()) {
- if (response.isAutoCreateJmsQueues()) {
- session.createAddress(dest.getSimpleAddress(), true);
+ if (response.isAutoCreateJmsTopics()) {
+ session.createAddress(dest.getSimpleAddress(), true, true);
} else {
throw new InvalidDestinationException("Topic " + dest.getName() + " does not exist");
}
@@ -1106,7 +1095,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
AddressQuery query = session.addressQuery(topic.getSimpleAddress());
- if (!query.isExists() && !query.isAutoCreateJmsQueues()) {
+ if (!query.isExists() && !query.isAutoCreateJmsTopics()) {
return null;
} else {
return topic;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index 1187db0..1c87f29 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -93,7 +93,7 @@ public class MQTTSubscriptionManager {
Queue q = session.getServer().locateQueue(queue);
if (q == null) {
- q = session.getServerSession().createQueue(new SimpleString(address), queue, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, -1, false);
+ q = session.getServerSession().createQueue(new SimpleString(address), queue, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, -1, false, true);
} else {
if (q.isDeleteOnNoConsumers()) {
throw ActiveMQMessageBundle.BUNDLE.invalidQueueConfiguration(q.getAddress(), q.getName(), "deleteOnNoConsumers", false, true);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java
index f4f23e6..15fb4ac 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java
@@ -41,12 +41,12 @@ public class ActiveMQStompException extends Exception {
}
public ActiveMQStompException(String msg) {
- super(msg);
+ super(msg.replace(":", ""));
handler = null;
}
public ActiveMQStompException(String msg, Throwable t) {
- super(msg, t);
+ super(msg.replace(":", ""), t);
this.body = t.getMessage();
handler = null;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
index 8108f32..861c524 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
@@ -86,10 +86,10 @@ public interface ActiveMQStompProtocolMessageBundle {
ActiveMQStompException noDestination();
@Message(id = 339016, value = "Error creating subscription {0}", format = Message.Format.MESSAGE_FORMAT)
- ActiveMQStompException errorCreatSubscription(String subscriptionID, @Cause Exception e);
+ ActiveMQStompException errorCreatingSubscription(String subscriptionID, @Cause Exception e);
@Message(id = 339017, value = "Error unsubscribing {0}", format = Message.Format.MESSAGE_FORMAT)
- ActiveMQStompException errorUnsubscrib(String subscriptionID, @Cause Exception e);
+ ActiveMQStompException errorUnsubscribing(String subscriptionID, @Cause Exception e);
@Message(id = 339018, value = "Error acknowledging message {0}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQStompException errorAck(String messageID, @Cause Exception e);
@@ -153,4 +153,7 @@ public interface ActiveMQStompProtocolMessageBundle {
@Message(id = 339040, value = "Undefined escape sequence: {0}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQStompException undefinedEscapeSequence(String sequence);
+
+ @Message(id = 339041, value = "Not allowed to specify {0} semantics on {1} address.", format = Message.Format.MESSAGE_FORMAT)
+ ActiveMQStompException illegalSemantics(String requested, String exists);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
index badcc1a..89c14e7 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
@@ -18,8 +18,6 @@ package org.apache.activemq.artemis.core.protocol.stomp;
/**
* The standard verbs and headers used for the <a href="http://stomp.codehaus.org/">STOMP</a> protocol.
- *
- * @version $Revision: 57 $
*/
public interface Stomp {
@@ -27,7 +25,7 @@ public interface Stomp {
String NEWLINE = "\n";
- public interface Commands {
+ interface Commands {
String CONNECT = "CONNECT";
@@ -53,7 +51,7 @@ public interface Stomp {
String STOMP = "STOMP";
}
- public interface Responses {
+ interface Responses {
String CONNECTED = "CONNECTED";
@@ -64,7 +62,7 @@ public interface Stomp {
String RECEIPT = "RECEIPT";
}
- public interface Headers {
+ interface Headers {
String SEPARATOR = ":";
@@ -78,15 +76,17 @@ public interface Stomp {
String CONTENT_TYPE = "content-type";
- public interface Response {
+ interface Response {
String RECEIPT_ID = "receipt-id";
}
- public interface Send {
+ interface Send {
String DESTINATION = "destination";
+ String DESTINATION_TYPE = "destination-type";
+
String CORRELATION_ID = "correlation-id";
String REPLY_TO = "reply-to";
@@ -97,10 +97,10 @@ public interface Stomp {
String TYPE = "type";
- Object PERSISTENT = "persistent";
+ String PERSISTENT = "persistent";
}
- public interface Message {
+ interface Message {
String MESSAGE_ID = "message-id";
@@ -129,7 +129,7 @@ public interface Stomp {
String VALIDATED_USER = "JMSXUserID";
}
- public interface Subscribe {
+ interface Subscribe {
String DESTINATION = "destination";
@@ -144,6 +144,8 @@ public interface Stomp {
String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name";
+ String SUBSCRIPTION_TYPE = "subscription-type";
+
String NO_LOCAL = "no-local";
public interface AckModeValues {
@@ -156,7 +158,7 @@ public interface Stomp {
}
}
- public interface Unsubscribe {
+ interface Unsubscribe {
String DESTINATION = "destination";
@@ -168,7 +170,7 @@ public interface Stomp {
String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name";
}
- public interface Connect {
+ interface Connect {
String LOGIN = "login";
@@ -182,10 +184,10 @@ public interface Stomp {
String ACCEPT_VERSION = "accept-version";
String HOST = "host";
- Object HEART_BEAT = "heart-beat";
+ String HEART_BEAT = "heart-beat";
}
- public interface Error {
+ interface Error {
String MESSAGE = "message";
@@ -193,7 +195,7 @@ public interface Stomp {
String VERSION = "version";
}
- public interface Connected {
+ interface Connected {
String SESSION = "session";
@@ -207,7 +209,7 @@ public interface Stomp {
String HEART_BEAT = "heart-beat";
}
- public interface Ack {
+ interface Ack {
String MESSAGE_ID = "message-id";
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index e58d4da..4cb8fe9 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -86,6 +86,12 @@ public final class StompConnection implements RemotingConnection {
private final boolean enableMessageID;
+ private final int minLargeMessageSize;
+
+ private final String anycastPrefix;
+
+ private final String multicastPrefix;
+
private StompVersions version;
private VersionedStompFrameHandler frameHandler;
@@ -97,8 +103,6 @@ public final class StompConnection implements RemotingConnection {
private final Object sendLock = new Object();
- private final int minLargeMessageSize;
-
private final ScheduledExecutorService scheduledExecutorService;
private final ExecutorFactory factory;
@@ -162,6 +166,8 @@ public final class StompConnection implements RemotingConnection {
this.enableMessageID = ConfigurationHelper.getBooleanProperty(TransportConstants.STOMP_ENABLE_MESSAGE_ID, false, acceptorUsed.getConfiguration());
this.minLargeMessageSize = ConfigurationHelper.getIntProperty(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, acceptorUsed.getConfiguration());
+ this.anycastPrefix = ConfigurationHelper.getStringProperty(TransportConstants.STOMP_ANYCAST_PREFIX, TransportConstants.DEFAULT_STOMP_ANYCAST_PREFIX, acceptorUsed.getConfiguration());
+ this.multicastPrefix = ConfigurationHelper.getStringProperty(TransportConstants.STOMP_MULTICAST_PREFIX, TransportConstants.DEFAULT_STOMP_MULTICAST_PREFIX, acceptorUsed.getConfiguration());
}
@Override
@@ -246,23 +252,39 @@ public final class StompConnection implements RemotingConnection {
}
public void checkDestination(String destination) throws ActiveMQStompException {
- autoCreateDestinationIfPossible(destination);
-
if (!manager.destinationExists(destination)) {
throw BUNDLE.destinationNotExist(destination).setHandler(frameHandler);
}
}
- public void autoCreateDestinationIfPossible(String queue) throws ActiveMQStompException {
- // TODO: STOMP clients will have to prefix their destination with queue:// or topic:// so we can determine what to do here
+ public boolean autoCreateDestinationIfPossible(String queue, AddressInfo.RoutingType routingType) throws ActiveMQStompException {
+ boolean result = false;
+
try {
- manager.getServer().createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(queue)).setRoutingType(AddressInfo.RoutingType.ANYCAST));
- manager.getServer().createQueue(SimpleString.toSimpleString(queue), SimpleString.toSimpleString(queue), null, true, false);
+ if (manager.getServer().getAddressInfo(SimpleString.toSimpleString(queue)) == null) {
+ // TODO check here to see if auto-creation is enabled
+ if (routingType.equals(AddressInfo.RoutingType.MULTICAST)) {
+ manager.getServer().createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(queue)).setAutoCreated(true));
+ } else {
+ manager.getServer().createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(queue)).setRoutingType(AddressInfo.RoutingType.ANYCAST).setAutoCreated(true));
+ manager.getServer().createQueue(SimpleString.toSimpleString(queue), SimpleString.toSimpleString(queue), null, null, true, false, true);
+ }
+ result = true;
+ }
} catch (ActiveMQQueueExistsException e) {
// ignore
} catch (Exception e) {
throw new ActiveMQStompException(e.getMessage(), e).setHandler(frameHandler);
}
+
+ return result;
+ }
+
+ public void checkRoutingSemantics(String destination, AddressInfo.RoutingType routingType) throws ActiveMQStompException {
+ AddressInfo.RoutingType actualRoutingTypeOfAddress = manager.getServer().getAddressInfo(SimpleString.toSimpleString(destination)).getRoutingType();
+ if (routingType != null && !routingType.equals(actualRoutingTypeOfAddress)) {
+ throw BUNDLE.illegalSemantics(routingType.toString(), actualRoutingTypeOfAddress.toString());
+ }
}
@Override
@@ -560,7 +582,7 @@ public final class StompConnection implements RemotingConnection {
if (stompSession.isNoLocal()) {
message.putStringProperty(CONNECTION_ID_PROP, getID().toString());
}
- if (enableMessageID()) {
+ if (isEnableMessageID()) {
message.putStringProperty("amqMessageId", "STOMP" + message.getMessageID());
}
try {
@@ -617,8 +639,11 @@ public final class StompConnection implements RemotingConnection {
String ack,
String id,
String durableSubscriptionName,
- boolean noLocal) throws ActiveMQStompException {
- autoCreateDestinationIfPossible(destination);
+ boolean noLocal,
+ AddressInfo.RoutingType subscriptionType) throws ActiveMQStompException {
+ autoCreateDestinationIfPossible(destination, subscriptionType);
+ checkDestination(destination);
+ checkRoutingSemantics(destination, subscriptionType);
if (noLocal) {
String noLocalFilter = CONNECTION_ID_PROP + " <> '" + getID().toString() + "'";
if (selector == null) {
@@ -643,11 +668,11 @@ public final class StompConnection implements RemotingConnection {
}
try {
- manager.createSubscription(this, subscriptionID, durableSubscriptionName, destination, selector, ack, noLocal);
+ manager.subscribe(this, subscriptionID, durableSubscriptionName, destination, selector, ack, noLocal);
} catch (ActiveMQStompException e) {
throw e;
} catch (Exception e) {
- throw BUNDLE.errorCreatSubscription(subscriptionID, e).setHandler(frameHandler);
+ throw BUNDLE.errorCreatingSubscription(subscriptionID, e).setHandler(frameHandler);
}
}
@@ -657,7 +682,7 @@ public final class StompConnection implements RemotingConnection {
} catch (ActiveMQStompException e) {
throw e;
} catch (Exception e) {
- throw BUNDLE.errorUnsubscrib(subscriptionID, e).setHandler(frameHandler);
+ throw BUNDLE.errorUnsubscribing(subscriptionID, e).setHandler(frameHandler);
}
}
@@ -710,7 +735,7 @@ public final class StompConnection implements RemotingConnection {
return this.frameHandler;
}
- public boolean enableMessageID() {
+ public boolean isEnableMessageID() {
return enableMessageID;
}
@@ -718,6 +743,14 @@ public final class StompConnection implements RemotingConnection {
return minLargeMessageSize;
}
+ public String getAnycastPrefix() {
+ return anycastPrefix;
+ }
+
+ public String getMulticastPrefix() {
+ return multicastPrefix;
+ }
+
public StompProtocolManager getManager() {
return manager;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 6029b37..0c1f7dd 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -368,13 +368,13 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
}
// Inner classes -------------------------------------------------
- public void createSubscription(StompConnection connection,
- String subscriptionID,
- String durableSubscriptionName,
- String destination,
- String selector,
- String ack,
- boolean noLocal) throws Exception {
+ public void subscribe(StompConnection connection,
+ String subscriptionID,
+ String durableSubscriptionName,
+ String destination,
+ String selector,
+ String ack,
+ boolean noLocal) throws Exception {
StompSession stompSession = getSession(connection);
stompSession.setNoLocal(noLocal);
if (stompSession.containsSubscription(subscriptionID)) {
@@ -411,7 +411,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
}
public boolean destinationExists(String destination) {
- return server.getPostOffice().getAddresses().contains(SimpleString.toSimpleString(destination));
+ return server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(destination)) != null;
}
public ActiveMQServer getServer() {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index 003865c..580bade 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10;
import org.apache.activemq.artemis.core.protocol.stomp.v11.StompFrameHandlerV11;
import org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12;
import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.ExecutorFactory;
@@ -167,8 +168,11 @@ public abstract class VersionedStompFrameHandler {
StompFrame response = null;
try {
connection.validate();
- String destination = frame.getHeader(Stomp.Headers.Send.DESTINATION);
- checkDestination(destination);
+ String destination = getDestination(frame);
+ AddressInfo.RoutingType routingType = getRoutingType(frame.getHeader(Headers.Send.DESTINATION_TYPE), frame.getHeader(Headers.Send.DESTINATION));
+ connection.autoCreateDestinationIfPossible(destination, routingType);
+ connection.checkDestination(destination);
+ connection.checkRoutingSemantics(destination, routingType);
String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
long timestamp = System.currentTimeMillis();
@@ -197,10 +201,6 @@ public abstract class VersionedStompFrameHandler {
return response;
}
- private void checkDestination(String destination) throws ActiveMQStompException {
- connection.checkDestination(destination);
- }
-
public StompFrame onBegin(StompFrame frame) {
StompFrame response = null;
String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
@@ -238,7 +238,7 @@ public abstract class VersionedStompFrameHandler {
public StompFrame onSubscribe(StompFrame request) {
StompFrame response = null;
- String destination = request.getHeader(Stomp.Headers.Subscribe.DESTINATION);
+ String destination = getDestination(request);
String selector = request.getHeader(Stomp.Headers.Subscribe.SELECTOR);
String ack = request.getHeader(Stomp.Headers.Subscribe.ACK_MODE);
@@ -247,6 +247,7 @@ public abstract class VersionedStompFrameHandler {
if (durableSubscriptionName == null) {
durableSubscriptionName = request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME);
}
+ AddressInfo.RoutingType routingType = getRoutingType(request.getHeader(Headers.Subscribe.SUBSCRIPTION_TYPE), request.getHeader(Headers.Subscribe.DESTINATION));
boolean noLocal = false;
if (request.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL)) {
@@ -254,7 +255,7 @@ public abstract class VersionedStompFrameHandler {
}
try {
- connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal);
+ connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal, routingType);
} catch (ActiveMQStompException e) {
response = e.getFrame();
}
@@ -262,6 +263,17 @@ public abstract class VersionedStompFrameHandler {
return response;
}
+ public String getDestination(StompFrame request) {
+ String destination = request.getHeader(Headers.Subscribe.DESTINATION);
+ if (connection.getMulticastPrefix().length() > 0 && destination.startsWith(connection.getMulticastPrefix())) {
+ destination = destination.substring(connection.getMulticastPrefix().length());
+ } else if (connection.getAnycastPrefix().length() > 0 && destination.startsWith(connection.getAnycastPrefix())) {
+ destination = destination.substring(connection.getAnycastPrefix().length());
+ }
+
+ return destination;
+ }
+
public StompFrame postprocess(StompFrame request) {
StompFrame response = null;
if (request.hasHeader(Stomp.Headers.RECEIPT_REQUESTED)) {
@@ -332,4 +344,19 @@ public abstract class VersionedStompFrameHandler {
connection.destroy();
}
+ private AddressInfo.RoutingType getRoutingType(String typeHeader, String destination) {
+ // null is valid to return here so we know when the user didn't provide any routing info
+ AddressInfo.RoutingType routingType = null;
+ if (typeHeader != null) {
+ routingType = AddressInfo.RoutingType.valueOf(typeHeader);
+ } else if (destination != null && !connection.getAnycastPrefix().equals(connection.getMulticastPrefix())) {
+ if (connection.getMulticastPrefix().length() > 0 && destination.startsWith(connection.getMulticastPrefix())) {
+ routingType = AddressInfo.RoutingType.MULTICAST;
+ } else if (connection.getAnycastPrefix().length() > 0 && destination.startsWith(connection.getAnycastPrefix())) {
+ routingType = AddressInfo.RoutingType.ANYCAST;
+ }
+ }
+ return routingType;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
index 5808bd3..23b8e32 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
@@ -276,7 +276,7 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
message.getBodyBuffer().writeBytes(Base64.decode(body));
}
message.setAddress(addressInfo.getName());
- postOffice.route(message, null, true);
+ postOffice.route(message, true);
return "" + message.getMessageID();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 7a1bb26..c4d25ac 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -728,7 +728,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
ByteBuffer buffer = ByteBuffer.allocate(8);
buffer.putLong(queue.getID());
message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
- postOffice.route(message, null, true);
+ postOffice.route(message, true);
return "" + message.getMessageID();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 2299e20..12eac9f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -1268,7 +1268,11 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
@Override
public void addAddressBinding(final long tx, final AddressInfo addressInfo) throws Exception {
- PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(), addressInfo.getRoutingType(), addressInfo.getDefaultMaxQueueConsumers());
+ PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(),
+ addressInfo.getRoutingType(),
+ addressInfo.getDefaultMaxQueueConsumers(),
+ addressInfo.isDefaultDeleteOnNoConsumers(),
+ addressInfo.isAutoCreated());
readLock();
try {
@@ -1398,7 +1402,6 @@ public abstract class AbstractJournalStorageManager implements StorageManager {
idGenerator.loadState(record.id, buffer);
} else if (rec == JournalRecordIds.ADDRESS_BINDING_RECORD) {
PersistentAddressBindingEncoding bindingEncoding = newAddressBindingEncoding(id, buffer);
- ActiveMQServerLogger.LOGGER.info("=== Loading: " + bindingEncoding);
addressBindingInfos.add(bindingEncoding);
} else if (rec == JournalRecordIds.GROUP_RECORD) {
GroupingEncoding encoding = newGroupEncoding(id, buffer);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java
index 3821b34..e47a210 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java
@@ -31,6 +31,10 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
public int defaultMaxConsumers;
+ public boolean defaultDeleteOnNoConsumers;
+
+ public boolean autoCreated;
+
public AddressInfo.RoutingType routingType;
public PersistentAddressBindingEncoding() {
@@ -45,15 +49,23 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
routingType +
", defaultMaxConsumers=" +
defaultMaxConsumers +
+ ", defaultDeleteOnNoConsumers=" +
+ defaultDeleteOnNoConsumers +
+ ", autoCreated=" +
+ autoCreated +
"]";
}
public PersistentAddressBindingEncoding(final SimpleString name,
final AddressInfo.RoutingType routingType,
- final int defaultMaxConsumers) {
+ final int defaultMaxConsumers,
+ final boolean defaultDeleteOnNoConsumers,
+ final boolean autoCreated) {
this.name = name;
this.routingType = routingType;
this.defaultMaxConsumers = defaultMaxConsumers;
+ this.defaultDeleteOnNoConsumers = defaultDeleteOnNoConsumers;
+ this.autoCreated = autoCreated;
}
@Override
@@ -85,6 +97,8 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
name = buffer.readSimpleString();
routingType = AddressInfo.RoutingType.getType(buffer.readByte());
defaultMaxConsumers = buffer.readInt();
+ defaultDeleteOnNoConsumers = buffer.readBoolean();
+ autoCreated = buffer.readBoolean();
}
@Override
@@ -92,10 +106,12 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres
buffer.writeSimpleString(name);
buffer.writeByte(routingType.getType());
buffer.writeInt(defaultMaxConsumers);
+ buffer.writeBoolean(defaultDeleteOnNoConsumers);
+ buffer.writeBoolean(autoCreated);
}
@Override
public int getEncodeSize() {
- return SimpleString.sizeofString(name) + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT;
+ return SimpleString.sizeofString(name) + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_BOOLEAN + DataConstants.SIZE_BOOLEAN;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
index bc8a6cf..f1225c1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
@@ -24,7 +24,6 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.QueueCreator;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
@@ -77,26 +76,22 @@ public interface PostOffice extends ActiveMQComponent {
Map<SimpleString, Binding> getAllBindings();
- RoutingStatus route(ServerMessage message, QueueCreator queueCreator, boolean direct) throws Exception;
+ RoutingStatus route(ServerMessage message, boolean direct) throws Exception;
RoutingStatus route(ServerMessage message,
- QueueCreator queueCreator,
Transaction tx,
boolean direct) throws Exception;
RoutingStatus route(ServerMessage message,
- QueueCreator queueCreator,
Transaction tx,
boolean direct,
boolean rejectDuplicates) throws Exception;
RoutingStatus route(ServerMessage message,
- QueueCreator queueCreator,
RoutingContext context,
boolean direct) throws Exception;
RoutingStatus route(ServerMessage message,
- QueueCreator queueCreator,
RoutingContext context,
boolean direct,
boolean rejectDuplicates) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 1dba309..135597f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -63,7 +63,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.QueueCreator;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.RoutingContext;
@@ -441,6 +440,11 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override
public AddressInfo removeAddressInfo(SimpleString address) {
+ try {
+ getServer().getManagementService().unregisterAddress(address);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
return addressManager.removeAddressInfo(address);
}
@@ -595,39 +599,34 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override
public RoutingStatus route(final ServerMessage message,
- QueueCreator queueCreator,
final boolean direct) throws Exception {
- return route(message, queueCreator, (Transaction) null, direct);
+ return route(message, (Transaction) null, direct);
}
@Override
public RoutingStatus route(final ServerMessage message,
- QueueCreator queueCreator,
final Transaction tx,
final boolean direct) throws Exception {
- return route(message, queueCreator, new RoutingContextImpl(tx), direct);
+ return route(message, new RoutingContextImpl(tx), direct);
}
@Override
public RoutingStatus route(final ServerMessage message,
- final QueueCreator queueCreator,
final Transaction tx,
final boolean direct,
final boolean rejectDuplicates) throws Exception {
- return route(message, queueCreator, new RoutingContextImpl(tx), direct, rejectDuplicates);
+ return route(message, new RoutingContextImpl(tx), direct, rejectDuplicates);
}
@Override
public RoutingStatus route(final ServerMessage message,
- final QueueCreator queueCreator,
final RoutingContext context,
final boolean direct) throws Exception {
- return route(message, queueCreator, context, direct, true);
+ return route(message, context, direct, true);
}
@Override
public RoutingStatus route(final ServerMessage message,
- final QueueCreator queueCreator,
final RoutingContext context,
final boolean direct,
boolean rejectDuplicates) throws Exception {
@@ -657,14 +656,15 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
+ // TODO auto-create queues here?
// first check for the auto-queue creation thing
- if (bindings == null && queueCreator != null) {
+ if (bindings == null) {
// There is no queue with this address, we will check if it needs to be created
- if (queueCreator.create(address)) {
+// if (queueCreator.create(address)) {
// TODO: this is not working!!!!
// reassign bindings if it was created
- bindings = addressManager.getBindingsForRoutingAddress(address);
- }
+// bindings = addressManager.getBindingsForRoutingAddress(address);
+// }
}
if (bindings != null) {
@@ -704,7 +704,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
message.setAddress(dlaAddress);
- route(message, null, context.getTransaction(), false);
+ route(message, context.getTransaction(), false);
result = RoutingStatus.NO_BINDINGS_DLA;
}
} else {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index f716847..84f554d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -241,30 +241,6 @@ public interface ActiveMQServer extends ActiveMQComponent {
long getUptimeMillis();
/**
- * This is the queue creator responsible for automatic JMS Queue creations.
- *
- * @param queueCreator
- */
- void setJMSQueueCreator(QueueCreator queueCreator);
-
- /**
- * @see org.apache.activemq.artemis.core.server.ActiveMQServer#setJMSQueueCreator(QueueCreator)
- */
- QueueCreator getJMSDestinationCreator();
-
- /**
- * This is the queue deleter responsible for automatic JMS Queue deletions.
- *
- * @param queueDeleter
- */
- void setJMSQueueDeleter(QueueDeleter queueDeleter);
-
- /**
- * @see org.apache.activemq.artemis.core.server.ActiveMQServer#setJMSQueueDeleter(QueueDeleter)
- */
- QueueDeleter getJMSQueueDeleter();
-
- /**
* Returns whether the initial replication synchronization process with the backup server is complete; applicable for
* either the live or backup server.
*/
@@ -369,7 +345,7 @@ public interface ActiveMQServer extends ActiveMQComponent {
QueueQueryResult queueQuery(SimpleString name) throws Exception;
Queue deployQueue(SimpleString address,
- SimpleString resourceName,
+ SimpleString queueName,
SimpleString filterString,
boolean durable,
boolean temporary,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueCreator.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueCreator.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueCreator.java
deleted file mode 100644
index f89a2b0..0000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueCreator.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.artemis.core.server;
-
-import org.apache.activemq.artemis.api.core.SimpleString;
-
-public interface QueueCreator {
-
- /**
- * You should return true if you even tried to create the queue and the queue was already there.
- * As the callers of this method will use that as an indicator that they should re-route the messages.
- * *
- *
- * @return True if a queue was created.
- */
- boolean create(SimpleString address) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueDeleter.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueDeleter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueDeleter.java
deleted file mode 100644
index d062848..0000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueDeleter.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.artemis.core.server;
-
-import org.apache.activemq.artemis.api.core.SimpleString;
-
-public interface QueueDeleter {
-
- /**
- * @return True if a queue was deleted.
- */
- boolean delete(SimpleString queueName) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java
index 64e7a5d..2557b73 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java
@@ -29,7 +29,7 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
*/
public interface QueueFactory {
- Queue createQueueWith(final QueueConfig config);
+ Queue createQueueWith(final QueueConfig config) throws Exception;
/**
* @deprecated Replaced by {@link #createQueueWith}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index 910eb22..28d283d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -87,8 +87,6 @@ public interface ServerSession extends SecurityAuth {
void markTXFailed(Throwable e);
- QueueCreator getQueueCreator();
-
List<Xid> xaGetInDoubtXids();
int xaGetTimeout();
@@ -194,7 +192,8 @@ public interface ServerSession extends SecurityAuth {
boolean temporary,
boolean durable,
Integer maxConsumers,
- Boolean deleteOnNoConsumers) throws Exception;
+ Boolean deleteOnNoConsumers,
+ final Boolean autoCreated) throws Exception;
void createSharedQueue(SimpleString address,
SimpleString name,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 285bf3b..7aa802b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -119,8 +119,6 @@ import org.apache.activemq.artemis.core.server.PostQueueCreationCallback;
import org.apache.activemq.artemis.core.server.PostQueueDeletionCallback;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
-import org.apache.activemq.artemis.core.server.QueueCreator;
-import org.apache.activemq.artemis.core.server.QueueDeleter;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
@@ -273,16 +271,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private FileStoreMonitor fileStoreMonitor;
- /**
- * This will be set by the JMS Queue Manager.
- */
- private QueueCreator jmsQueueCreator;
-
- /**
- * This will be set by the JMS Queue Manager.
- */
- private QueueDeleter jmsQueueDeleter;
-
private final Map<String, ServerSession> sessions = new ConcurrentHashMap<>();
private final Semaphore activationLock = new Semaphore(1);
@@ -721,11 +709,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
- if (autoCreateJmsTopics) {
- putAddressInfoIfAbsent(new AddressInfo(address));
- }
-
- return new BindingQueryResult(getAddressInfo(address) != null, names, autoCreateJmsQueues, autoCreateJmsTopics);
+ return new BindingQueryResult(!names.isEmpty(), names, autoCreateJmsQueues, autoCreateJmsTopics);
}
@Override
@@ -794,26 +778,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
@Override
- public QueueCreator getJMSDestinationCreator() {
- return jmsQueueCreator;
- }
-
- @Override
- public void setJMSQueueCreator(QueueCreator jmsQueueCreator) {
- this.jmsQueueCreator = jmsQueueCreator;
- }
-
- @Override
- public QueueDeleter getJMSQueueDeleter() {
- return jmsQueueDeleter;
- }
-
- @Override
- public void setJMSQueueDeleter(QueueDeleter jmsQueueDeleter) {
- this.jmsQueueDeleter = jmsQueueDeleter;
- }
-
- @Override
public boolean isReplicaSync() {
if (activation instanceof SharedNothingLiveActivation) {
ReplicationManager replicationManager = getReplicationManager();
@@ -1358,7 +1322,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
SessionCallback callback,
OperationContext context,
boolean autoCreateJMSQueues) throws Exception {
- return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), callback, context, autoCreateJMSQueues ? jmsQueueCreator : null, pagingManager);
+ return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), callback, context, pagingManager);
}
@Override
@@ -1616,17 +1580,17 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override
public Queue deployQueue(final SimpleString address,
- final SimpleString resourceName,
+ final SimpleString queueName,
final SimpleString filterString,
final boolean durable,
final boolean temporary,
final boolean autoCreated) throws Exception {
- return deployQueue(address, resourceName, filterString, durable, temporary, autoCreated, null, null);
+ return deployQueue(address, queueName, filterString, durable, temporary, autoCreated, null, null);
}
@Override
public Queue deployQueue(final SimpleString address,
- final SimpleString resourceName,
+ final SimpleString queueName,
final SimpleString filterString,
final boolean durable,
final boolean temporary,
@@ -1635,9 +1599,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final Boolean deleteOnNoConsumers) throws Exception {
// TODO: fix logging here as this could be for a topic or queue
- ActiveMQServerLogger.LOGGER.deployQueue(resourceName);
+ ActiveMQServerLogger.LOGGER.deployQueue(queueName);
- return createQueue(address, resourceName, filterString, null, durable, temporary, true, false, autoCreated, maxConsumers, deleteOnNoConsumers);
+ return createQueue(address, queueName, filterString, null, durable, temporary, true, false, autoCreated, maxConsumers, deleteOnNoConsumers);
}
@Override
@@ -2137,6 +2101,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
// Deploy any predefined queues
deployQueuesFromConfiguration();
+ registerPostQueueDeletionCallback(new PostQueueDeletionCallback() {
+ // TODO delete auto-created addresses when queueCount == 0
+ @Override
+ public void callback(SimpleString address, SimpleString queueName) throws Exception {
+ if (getAddressInfo(address).isAutoCreated() && postOffice.getBindingsForAddress(address).getBindings().size() == 0) {
+ removeAddressInfo(address);
+ }
+ }
+ });
+
// We need to call this here, this gives any dependent server a chance to deploy its own addresses
// this needs to be done before clustering is fully activated
callActivateCallbacks();
@@ -2408,7 +2382,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean autoCreated,
final Integer maxConsumers,
final Boolean deleteOnNoConsumers) throws Exception {
-
final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName);
if (binding != null) {
if (ignoreIfExists) {
@@ -2465,7 +2438,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (transientQueue) {
queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName()));
} else if (queue.isAutoCreated()) {
- queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this.getJMSQueueDeleter(), queue.getName()));
+ queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this, queue.getName()));
}
final QueueBinding localQueueBinding = new LocalQueueBinding(getAddressInfo(queue.getAddress()), queue, nodeManager.getNodeId());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
index 708aeda..6ad40fa 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
@@ -29,6 +29,8 @@ public class AddressInfo {
private int defaultMaxQueueConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers();
+ private boolean autoCreated = false;
+
public AddressInfo(SimpleString name) {
this.name = name;
}
@@ -67,6 +69,15 @@ public class AddressInfo {
return this;
}
+ public boolean isAutoCreated() {
+ return autoCreated;
+ }
+
+ public AddressInfo setAutoCreated(boolean autoCreated) {
+ this.autoCreated = autoCreated;
+ return this;
+ }
+
public SimpleString getName() {
return name;
}
@@ -78,6 +89,7 @@ public class AddressInfo {
buff.append(", routingType=" + routingType);
buff.append(", defaultMaxQueueConsumers=" + defaultMaxQueueConsumers);
buff.append(", defaultDeleteOnNoConsumers=" + defaultDeleteOnNoConsumers);
+ buff.append(", autoCreated=" + autoCreated);
buff.append("]");
return buff.toString();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java
index 535e53b..a211a96 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AutoCreatedQueueManagerImpl.java
@@ -17,34 +17,48 @@
package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.AutoCreatedQueueManager;
-import org.apache.activemq.artemis.core.server.QueueDeleter;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.ReferenceCounterUtil;
public class AutoCreatedQueueManagerImpl implements AutoCreatedQueueManager {
private final SimpleString queueName;
- private final QueueDeleter deleter;
+ private final ActiveMQServer server;
private final Runnable runnable = new Runnable() {
@Override
public void run() {
- try {
- if (deleter != null) {
- deleter.delete(queueName);
+ Queue queue = server.locateQueue(queueName);
+ SimpleString address = queue.getAddress();
+ AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
+ long consumerCount = queue.getConsumerCount();
+ long messageCount = queue.getMessageCount();
+
+ // TODO make sure this is the right check
+ if ((queue.isAutoCreated() || queue.isDeleteOnNoConsumers()) && queue.getMessageCount() == 0) {
+ if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
+ ActiveMQServerLogger.LOGGER.debug("deleting auto-created queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; getAutoDeleteJmsQueues = " + settings.getAutoDeleteJmsQueues());
+ }
+
+ // TODO handle this exception better
+ try {
+ server.destroyQueue(queueName, null, true, false);
+ } catch (Exception e) {
+ e.printStackTrace();
}
- } catch (Exception e) {
- ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e, queueName);
}
}
};
private final ReferenceCounterUtil referenceCounterUtil = new ReferenceCounterUtil(runnable);
- public AutoCreatedQueueManagerImpl(QueueDeleter deleter, SimpleString queueName) {
- this.deleter = deleter;
+ public AutoCreatedQueueManagerImpl(ActiveMQServer server, SimpleString queueName) {
+ this.server = server;
this.queueName = queueName;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
index e583fc0..5782379 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
@@ -104,7 +104,7 @@ public class DivertImpl implements Divert {
copy = message;
}
- postOffice.route(copy, null, context.getTransaction(), false);
+ postOffice.route(copy, context.getTransaction(), false);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index 76fc69b..eb31737 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -151,9 +151,7 @@ public class PostOfficeJournalLoader implements JournalLoader {
.deleteOnNoConsumers(queueBindingInfo.isDeleteOnNoConsumers())
.maxConsumers(queueBindingInfo.getMaxConsumers());
final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build());
- if (queue.isAutoCreated()) {
- queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl) postOffice).getServer().getJMSQueueDeleter(), queueBindingInfo.getQueueName()));
- }
+ queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl)postOffice).getServer(), queueBindingInfo.getQueueName()));
if (queueBindingInfo.getQueueStatusEncodings() != null) {
for (QueueStatusEncoding encoding : queueBindingInfo.getQueueStatusEncodings()) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index c391b90..7c614ae 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2326,7 +2326,7 @@ public class QueueImpl implements Queue {
copyMessage.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
}
- postOffice.route(copyMessage, null, tx, false, rejectDuplicate);
+ postOffice.route(copyMessage, tx, false, rejectDuplicate);
acknowledge(tx, ref);
}
@@ -2530,7 +2530,7 @@ public class QueueImpl implements Queue {
copyMessage.setAddress(address);
- postOffice.route(copyMessage, null, tx, false, rejectDuplicate);
+ postOffice.route(copyMessage, tx, false, rejectDuplicate);
acknowledge(tx, ref, reason);