You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2019/02/07 21:18:57 UTC
[activemq-artemis] branch 2.6.x updated: ARTEMIS-2081
listConfiguredQueues returns only queues created by config
This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/2.6.x by this push:
new 6455990 ARTEMIS-2081 listConfiguredQueues returns only queues created by config
6455990 is described below
commit 6455990d4f34b5dad4e02ac43ec49f2a9468d0a3
Author: Michael André Pearce <mi...@me.com>
AuthorDate: Thu Feb 7 18:06:40 2019 +0000
ARTEMIS-2081 listConfiguredQueues returns only queues created by config
Extend test case to reproduce problem of client created queues being incorrectly removed on simple reload of config.
Add a flag/field to the queues created by configuration/broker.xml so we can correctly filter only queues created/managed by config.
Update listConfiguredQueues to use the new queue flag
---
.../artemis/api/core/management/QueueControl.java | 6 +++
.../core/management/impl/QueueControlImpl.java | 12 ++++++
.../artemis/core/persistence/QueueBindingInfo.java | 4 ++
.../journal/AbstractJournalStorageManager.java | 2 +-
.../codec/PersistentQueueBindingEncoding.java | 25 ++++++++++-
.../artemis/core/postoffice/PostOffice.java | 3 +-
.../core/postoffice/impl/PostOfficeImpl.java | 7 ++-
.../apache/activemq/artemis/core/server/Queue.java | 4 ++
.../activemq/artemis/core/server/QueueConfig.java | 24 +++++++++--
.../core/server/impl/ActiveMQServerImpl.java | 50 +++++++++++++++++++---
.../artemis/core/server/impl/LastValueQueue.java | 3 +-
.../core/server/impl/PostOfficeJournalLoader.java | 3 +-
.../artemis/core/server/impl/QueueFactoryImpl.java | 7 ++-
.../artemis/core/server/impl/QueueImpl.java | 37 ++++++++++++++++
.../server/impl/ScheduledDeliveryHandlerTest.java | 10 +++++
.../tests/integration/jms/RedeployTest.java | 10 +++++
.../management/QueueControlUsingCoreTest.java | 5 +++
.../resources/reload-address-queues-updated.xml | 4 ++
.../src/test/resources/reload-address-queues.xml | 4 ++
.../tests/unit/core/postoffice/impl/FakeQueue.java | 10 +++++
.../core/server/impl/fakes/FakePostOffice.java | 3 +-
21 files changed, 214 insertions(+), 19 deletions(-)
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
index 2aafcb1..917e372 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
@@ -225,6 +225,12 @@ public interface QueueControl {
/**
*
*/
+ @Attribute(desc = "is this queue managed by configuration (broker.xml)")
+ boolean isConfigurationManaged();
+
+ /**
+ *
+ */
@Attribute(desc = "If the queue should route exclusively to one consumer")
boolean isExclusive();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 14abeb1..4914ed2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -484,6 +484,18 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
@Override
+ public boolean isConfigurationManaged() {
+ checkStarted();
+
+ clearIO();
+ try {
+ return queue.isConfigurationManaged();
+ } finally {
+ blockOnIO();
+ }
+ }
+
+ @Override
public boolean isExclusive() {
checkStarted();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
index 7e18311..7b84c3d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
@@ -40,6 +40,10 @@ public interface QueueBindingInfo {
boolean isAutoCreated();
+ boolean isConfigurationManaged();
+
+ void setConfigurationManaged(boolean configurationManaged);
+
SimpleString getUser();
void addQueueStatusEncoding(QueueStatusEncoding status);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 842c924..9c51208 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -1296,7 +1296,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
SimpleString filterString = filter == null ? null : filter.getFilterString();
- PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isExclusive(), queue.isLastValue(), queue.getRoutingType().getType());
+ PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isExclusive(), queue.isLastValue(), queue.getRoutingType().getType(), queue.isConfigurationManaged());
readLock();
try {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
index 2ab4396..f4f103b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
@@ -52,6 +52,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
public byte routingType;
+ public boolean configurationManaged;
+
public PersistentQueueBindingEncoding() {
}
@@ -78,6 +80,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
lastValue +
", routingType=" +
routingType +
+ ", configurationManaged=" +
+ configurationManaged +
"]";
}
@@ -90,7 +94,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
final boolean purgeOnNoConsumers,
final boolean exclusive,
final boolean lastValue,
- final byte routingType) {
+ final byte routingType,
+ final boolean configurationManaged) {
this.name = name;
this.address = address;
this.filterString = filterString;
@@ -101,6 +106,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
this.exclusive = exclusive;
this.lastValue = lastValue;
this.routingType = routingType;
+ this.configurationManaged = configurationManaged;
}
@Override
@@ -143,6 +149,16 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
}
@Override
+ public boolean isConfigurationManaged() {
+ return configurationManaged;
+ }
+
+ @Override
+ public void setConfigurationManaged(boolean configurationManaged) {
+ this.configurationManaged = configurationManaged;
+ }
+
+ @Override
public void addQueueStatusEncoding(QueueStatusEncoding status) {
if (queueStatusEncodings == null) {
queueStatusEncodings = new LinkedList<>();
@@ -246,6 +262,11 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
} else {
lastValue = ActiveMQDefaultConfiguration.getDefaultLastValue();
}
+ if (buffer.readableBytes() > 0) {
+ configurationManaged = buffer.readBoolean();
+ } else {
+ configurationManaged = false;
+ }
}
@Override
@@ -260,6 +281,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
buffer.writeByte(routingType);
buffer.writeBoolean(exclusive);
buffer.writeBoolean(lastValue);
+ buffer.writeBoolean(configurationManaged);
}
@Override
@@ -271,6 +293,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_BYTE +
DataConstants.SIZE_BOOLEAN +
+ DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_BOOLEAN;
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
index 19ddd94..e5a6dce 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
@@ -68,7 +68,8 @@ public interface PostOffice extends ActiveMQComponent {
RoutingType routingType,
Integer maxConsumers,
Boolean purgeOnNoConsumers,
- Boolean exclusive) throws Exception;
+ Boolean exclusive,
+ Boolean configurationManaged) throws Exception;
List<Queue> listQueuesForAddress(SimpleString address) throws Exception;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 66225d2..b3b2461 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -467,7 +467,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
RoutingType routingType,
Integer maxConsumers,
Boolean purgeOnNoConsumers,
- Boolean exclusive) throws Exception {
+ Boolean exclusive,
+ Boolean configurationManaged) throws Exception {
synchronized (addressLock) {
final QueueBinding queueBinding = (QueueBinding) addressManager.getBinding(name);
if (queueBinding == null) {
@@ -511,6 +512,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
changed = true;
queue.setExclusive(exclusive);
}
+ if (configurationManaged != null && !configurationManaged.equals(queue.isConfigurationManaged())) {
+ changed = true;
+ queue.setConfigurationManaged(configurationManaged);
+ }
if (changed) {
final long txID = storageManager.generateID();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index c549be9..1354d76 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -78,6 +78,10 @@ public interface Queue extends Bindable,CriticalComponent {
void setMaxConsumer(int maxConsumers);
+ boolean isConfigurationManaged();
+
+ void setConfigurationManaged(boolean configurationManaged);
+
void addConsumer(Consumer consumer) throws Exception;
void removeConsumer(Consumer consumer);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
index 6b1c284..7cf4e63 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
@@ -41,6 +41,7 @@ public final class QueueConfig {
private final boolean exclusive;
private final boolean lastValue;
private final boolean purgeOnNoConsumers;
+ private final boolean configurationManaged;
public static final class Builder {
@@ -58,6 +59,7 @@ public final class QueueConfig {
private boolean exclusive;
private boolean lastValue;
private boolean purgeOnNoConsumers;
+ private boolean configurationManaged;
private Builder(final long id, final SimpleString name) {
this(id, name, name);
@@ -78,6 +80,7 @@ public final class QueueConfig {
this.exclusive = ActiveMQDefaultConfiguration.getDefaultExclusive();
this.lastValue = ActiveMQDefaultConfiguration.getDefaultLastValue();
this.purgeOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers();
+ this.configurationManaged = false;
validateState();
}
@@ -94,6 +97,11 @@ public final class QueueConfig {
}
}
+ public Builder configurationManaged(final boolean configurationManaged) {
+ this.configurationManaged = configurationManaged;
+ return this;
+ }
+
public Builder filter(final Filter filter) {
this.filter = filter;
return this;
@@ -176,7 +184,7 @@ public final class QueueConfig {
} else {
pageSubscription = null;
}
- return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, lastValue, purgeOnNoConsumers);
+ return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, lastValue, purgeOnNoConsumers, configurationManaged);
}
}
@@ -222,7 +230,8 @@ public final class QueueConfig {
final int maxConsumers,
final boolean exclusive,
final boolean lastValue,
- final boolean purgeOnNoConsumers) {
+ final boolean purgeOnNoConsumers,
+ final boolean configurationManaged) {
this.id = id;
this.address = address;
this.name = name;
@@ -237,6 +246,7 @@ public final class QueueConfig {
this.exclusive = exclusive;
this.lastValue = lastValue;
this.maxConsumers = maxConsumers;
+ this.configurationManaged = configurationManaged;
}
public long id() {
@@ -295,6 +305,10 @@ public final class QueueConfig {
return routingType;
}
+ public boolean isConfigurationManaged() {
+ return configurationManaged;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o)
@@ -330,6 +344,8 @@ public final class QueueConfig {
return false;
if (purgeOnNoConsumers != that.purgeOnNoConsumers)
return false;
+ if (configurationManaged != that.configurationManaged)
+ return false;
return user != null ? user.equals(that.user) : that.user == null;
}
@@ -350,6 +366,7 @@ public final class QueueConfig {
result = 31 * result + (exclusive ? 1 : 0);
result = 31 * result + (lastValue ? 1 : 0);
result = 31 * result + (purgeOnNoConsumers ? 1 : 0);
+ result = 31 * result + (configurationManaged ? 1 : 0);
return result;
}
@@ -369,6 +386,7 @@ public final class QueueConfig {
+ ", maxConsumers=" + maxConsumers
+ ", exclusive=" + exclusive
+ ", lastValue=" + lastValue
- + ", purgeOnNoConsumers=" + purgeOnNoConsumers + '}';
+ + ", purgeOnNoConsumers=" + purgeOnNoConsumers
+ + ", configurationManaged=" + configurationManaged + '}';
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 80e4217..2d04ebf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -2538,7 +2538,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
private List<Queue> listConfiguredQueues(SimpleString address) throws Exception {
- return listQueues(address).stream().filter(queue -> !queue.isAutoCreated() && !queue.isInternalQueue()).collect(Collectors.toList());
+ return listQueues(address).stream().filter(queue -> queue.isConfigurationManaged()).collect(Collectors.toList());
}
private List<Queue> listQueues(SimpleString address) throws Exception {
@@ -2597,7 +2597,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
int maxConsumer = (config.isMaxConsumerConfigured()) ? maxConsumerQueueConfig : maxConsumerAddressSetting;
if (locateQueue(queueName) != null && locateQueue(queueName).getAddress().toString().equals(config.getAddress())) {
updateQueue(config.getName(), config.getRoutingType(), maxConsumer, config.getPurgeOnNoConsumers(),
- config.isExclusive() == null ? as.isDefaultExclusiveQueue() : config.isExclusive());
+ config.isExclusive() == null ? as.isDefaultExclusiveQueue() : config.isExclusive(), true);
} else {
// if the address::queue doesn't exist then create it
try {
@@ -2605,7 +2605,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
queueName, SimpleString.toSimpleString(config.getFilterString()), SimpleString.toSimpleString(config.getUser()),
config.isDurable(),false,false,false,false,maxConsumer,config.getPurgeOnNoConsumers(),
config.isExclusive() == null ? as.isDefaultExclusiveQueue() : config.isExclusive(),
- config.isLastValue() == null ? as.isDefaultLastValueQueue() : config.isLastValue(), true);
+ config.isLastValue() == null ? as.isDefaultLastValueQueue() : config.isLastValue(), true, true);
} catch (ActiveMQQueueExistsException e) {
// the queue may exist on a *different* address
ActiveMQServerLogger.LOGGER.warn(e.getMessage());
@@ -2908,6 +2908,25 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean exclusive,
final boolean lastValue,
final boolean autoCreateAddress) throws Exception {
+ return createQueue(address, routingType, queueName, filterString, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, autoCreateAddress, false);
+ }
+
+ private Queue createQueue(final SimpleString address,
+ final RoutingType routingType,
+ final SimpleString queueName,
+ final SimpleString filterString,
+ final SimpleString user,
+ final boolean durable,
+ final boolean temporary,
+ final boolean ignoreIfExists,
+ final boolean transientQueue,
+ final boolean autoCreated,
+ final int maxConsumers,
+ final boolean purgeOnNoConsumers,
+ final boolean exclusive,
+ final boolean lastValue,
+ final boolean autoCreateAddress,
+ final boolean configurationManaged) throws Exception {
final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName);
if (binding != null) {
@@ -2948,7 +2967,19 @@ public class ActiveMQServerImpl implements ActiveMQServer {
throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeForAddress(routingType, info.getName().toString(), info.getRoutingTypes());
}
- final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).routingType(routingType).maxConsumers(maxConsumers).purgeOnNoConsumers(purgeOnNoConsumers).exclusive(exclusive).lastValue(lastValue).build();
+ final QueueConfig queueConfig = queueConfigBuilder
+ .filter(filter)
+ .pagingManager(pagingManager)
+ .user(user)
+ .durable(durable)
+ .temporary(temporary)
+ .autoCreated(autoCreated).routingType(routingType)
+ .maxConsumers(maxConsumers)
+ .purgeOnNoConsumers(purgeOnNoConsumers)
+ .exclusive(exclusive)
+ .lastValue(lastValue)
+ .configurationManaged(configurationManaged)
+ .build();
callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeCreateQueue(queueConfig) : null);
@@ -3013,7 +3044,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
Integer maxConsumers,
Boolean purgeOnNoConsumers,
Boolean exclusive) throws Exception {
- final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, maxConsumers, purgeOnNoConsumers, exclusive);
+ return updateQueue(name, routingType, maxConsumers, purgeOnNoConsumers, exclusive, false);
+ }
+
+ private Queue updateQueue(String name,
+ RoutingType routingType,
+ Integer maxConsumers,
+ Boolean purgeOnNoConsumers,
+ Boolean exclusive,
+ Boolean configurationManaged) throws Exception {
+ final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, maxConsumers, purgeOnNoConsumers, exclusive, configurationManaged);
if (queueBinding != null) {
final Queue queue = queueBinding.getQueue();
return queue;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index fc96591..302e509 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -64,6 +64,7 @@ public class LastValueQueue extends QueueImpl {
final Integer maxConsumers,
final Boolean exclusive,
final Boolean purgeOnNoConsumers,
+ final boolean configurationManaged,
final ScheduledExecutorService scheduledExecutor,
final PostOffice postOffice,
final StorageManager storageManager,
@@ -71,7 +72,7 @@ public class LastValueQueue extends QueueImpl {
final ArtemisExecutor executor,
final ActiveMQServer server,
final QueueFactory factory) {
- super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
+ super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, purgeOnNoConsumers, configurationManaged, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
}
@Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index 0132818..d2adabb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -153,7 +153,8 @@ public class PostOfficeJournalLoader implements JournalLoader {
.maxConsumers(queueBindingInfo.getMaxConsumers())
.exclusive(queueBindingInfo.isExclusive())
.lastValue(queueBindingInfo.isLastValue())
- .routingType(RoutingType.getType(queueBindingInfo.getRoutingType()));
+ .routingType(RoutingType.getType(queueBindingInfo.getRoutingType()))
+ .configurationManaged((queueBindingInfo.isConfigurationManaged()));
final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build());
queue.setConsumersRefCount(new QueueManagerImpl(((PostOfficeImpl)postOffice).getServer(), queueBindingInfo.getQueueName()));
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
index 7f23d09..bf47e95 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
@@ -74,11 +74,10 @@ public class QueueFactoryImpl implements QueueFactory {
public Queue createQueueWith(final QueueConfig config) {
final Queue queue;
if (config.isLastValue()) {
- queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
+ queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.isPurgeOnNoConsumers(), config.isConfigurationManaged(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
} else {
- queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
+ queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.isPurgeOnNoConsumers(), config.isConfigurationManaged(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
}
-
server.getCriticalAnalyzer().add(queue);
return queue;
}
@@ -102,7 +101,7 @@ public class QueueFactoryImpl implements QueueFactory {
Queue queue;
if (addressSettings.isDefaultLastValueQueue()) {
- queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultExclusive(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
+ queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultExclusive(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), false, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
} else {
queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 76cb44f..27f9a16 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -269,6 +269,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private final QueueFactory factory;
+ private volatile boolean configurationManaged;
/**
* This is to avoid multi-thread races on calculating direct delivery,
* to guarantee ordering will be always be correct
@@ -414,6 +415,30 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
final ArtemisExecutor executor,
final ActiveMQServer server,
final QueueFactory factory) {
+ this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, purgeOnNoConsumers, false, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
+ }
+
+ public QueueImpl(final long id,
+ final SimpleString address,
+ final SimpleString name,
+ final Filter filter,
+ final PageSubscription pageSubscription,
+ final SimpleString user,
+ final boolean durable,
+ final boolean temporary,
+ final boolean autoCreated,
+ final RoutingType routingType,
+ final Integer maxConsumers,
+ final Boolean exclusive,
+ final Boolean purgeOnNoConsumers,
+ final boolean configurationManaged,
+ final ScheduledExecutorService scheduledExecutor,
+ final PostOffice postOffice,
+ final StorageManager storageManager,
+ final HierarchicalRepository<AddressSettings> addressSettingsRepository,
+ final ArtemisExecutor executor,
+ final ActiveMQServer server,
+ final QueueFactory factory) {
super(server == null ? EmptyCriticalAnalyzer.getInstance() : server.getCriticalAnalyzer(), CRITICAL_PATHS);
this.id = id;
@@ -442,6 +467,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
this.purgeOnNoConsumers = purgeOnNoConsumers == null ? ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers() : purgeOnNoConsumers;
+ this.configurationManaged = configurationManaged;
+
this.postOffice = postOffice;
this.storageManager = storageManager;
@@ -572,6 +599,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
@Override
+ public boolean isConfigurationManaged() {
+ return configurationManaged;
+ }
+
+ @Override
+ public synchronized void setConfigurationManaged(boolean configurationManaged) {
+ this.configurationManaged = configurationManaged;
+ }
+
+ @Override
public SimpleString getName() {
return name;
}
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 9005898..02070ca4 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -799,6 +799,16 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
+ public boolean isConfigurationManaged() {
+ return false;
+ }
+
+ @Override
+ public void setConfigurationManaged(boolean configurationManaged) {
+
+ }
+
+ @Override
public void recheckRefCount(OperationContext context) {
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
index e892d93..3637c80 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
@@ -242,8 +242,15 @@ public class RedeployTest extends ActiveMQTestBase {
embeddedJMS.getActiveMQServer().getReloadManager().setTick(tick);
+ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
+ try (JMSContext jmsContext = connectionFactory.createContext()) {
+ jmsContext.createSharedDurableConsumer(jmsContext.createTopic("config_test_consumer_created_queues"),"mySub").receive(100);
+ }
+
try {
latch.await(10, TimeUnit.SECONDS);
+ Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "config_test_consumer_created_queues").contains("mySub"));
+
Assert.assertNotNull(getAddressInfo(embeddedJMS, "config_test_address_removal_no_queue"));
Assert.assertNotNull(getAddressInfo(embeddedJMS, "config_test_address_removal"));
Assert.assertNotNull(getAddressInfo(embeddedJMS, "config_test_queue_removal"));
@@ -266,6 +273,9 @@ public class RedeployTest extends ActiveMQTestBase {
embeddedJMS.getActiveMQServer().getReloadManager().setTick(tick);
latch.await(10, TimeUnit.SECONDS);
+ //Ensure queues created by clients (NOT by broker.xml are not removed when we reload).
+ Assert.assertTrue(listQueuesNamesForAddress(embeddedJMS, "config_test_consumer_created_queues").contains("mySub"));
+
Assert.assertNull(getAddressInfo(embeddedJMS, "config_test_address_removal_no_queue"));
Assert.assertNull(getAddressInfo(embeddedJMS, "config_test_address_removal"));
Assert.assertNotNull(getAddressInfo(embeddedJMS, "config_test_queue_removal"));
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
index dcdb2f1..d9da4f3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
@@ -142,6 +142,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
}
@Override
+ public boolean isConfigurationManaged() {
+ return (Boolean) proxy.retrieveAttributeValue("configurationManaged");
+ }
+
+ @Override
public boolean isExclusive() {
return (Boolean) proxy.retrieveAttributeValue("exclusive");
}
diff --git a/tests/integration-tests/src/test/resources/reload-address-queues-updated.xml b/tests/integration-tests/src/test/resources/reload-address-queues-updated.xml
index f8d1d91..fd73f11 100644
--- a/tests/integration-tests/src/test/resources/reload-address-queues-updated.xml
+++ b/tests/integration-tests/src/test/resources/reload-address-queues-updated.xml
@@ -117,6 +117,10 @@ under the License.
</wildcard-addresses>
<addresses>
+ <address name="config_test_consumer_created_queues">
+ <multicast>
+ </multicast>
+ </address>
<address name="config_test_queue_removal">
<multicast>
<queue name="config_test_queue_removal_queue_1"/>
diff --git a/tests/integration-tests/src/test/resources/reload-address-queues.xml b/tests/integration-tests/src/test/resources/reload-address-queues.xml
index ebd0f4e..74c9d08 100644
--- a/tests/integration-tests/src/test/resources/reload-address-queues.xml
+++ b/tests/integration-tests/src/test/resources/reload-address-queues.xml
@@ -120,6 +120,10 @@ under the License.
</wildcard-addresses>
<addresses>
+ <address name="config_test_consumer_created_queues">
+ <multicast>
+ </multicast>
+ </address>
<address name="config_test_queue_removal">
<multicast>
<queue name="config_test_queue_removal_queue_1"/>
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 5c37aec..59caeec 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -71,6 +71,16 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
+ public boolean isConfigurationManaged() {
+ return false;
+ }
+
+ @Override
+ public void setConfigurationManaged(boolean configurationManaged) {
+
+ }
+
+ @Override
public boolean isInternalQueue() {
// no-op
return false;
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
index 6402bff..925ed2f 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
@@ -48,7 +48,8 @@ public class FakePostOffice implements PostOffice {
RoutingType routingType,
Integer maxConsumers,
Boolean purgeOnNoConsumers,
- Boolean exclusive) throws Exception {
+ Boolean exclusive,
+ Boolean configurationManaged) throws Exception {
return null;
}