You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/11 19:44:27 UTC
[1/2] activemq-artemis git commit: ARTEMIS-2081 listConfiguredQueues
returns only queues created by config
Repository: activemq-artemis
Updated Branches:
refs/heads/master 71390fea4 -> 4507d7783
ARTEMIS-2081 listConfiguredQueues returns only queues created by config
Extend test case to reproduce problem of client created queues being incorrectly removed on simple reload of config.
Add a flag/field to the queues created by configuration/broker.xml so we can correctly filter only queues created/managed by config.
Update listConfiguredQueues to use the new queue flag
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c417d0b5
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c417d0b5
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c417d0b5
Branch: refs/heads/master
Commit: c417d0b5f8ac67b1c9bde67cd6d6300d4194e2f8
Parents: 71390fe
Author: Michael André Pearce <mi...@me.com>
Authored: Sat Sep 8 00:23:55 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Sep 11 13:55:11 2018 -0400
----------------------------------------------------------------------
.../api/core/management/QueueControl.java | 6 +++
.../core/management/impl/QueueControlImpl.java | 12 ++++++
.../core/persistence/QueueBindingInfo.java | 4 ++
.../journal/AbstractJournalStorageManager.java | 2 +-
.../codec/PersistentQueueBindingEncoding.java | 27 ++++++++++++-
.../artemis/core/postoffice/PostOffice.java | 3 +-
.../core/postoffice/impl/PostOfficeImpl.java | 7 +++-
.../activemq/artemis/core/server/Queue.java | 4 ++
.../artemis/core/server/QueueConfig.java | 24 ++++++++++--
.../core/server/impl/ActiveMQServerImpl.java | 41 ++++++++++++++++++--
.../core/server/impl/LastValueQueue.java | 3 +-
.../server/impl/PostOfficeJournalLoader.java | 3 +-
.../core/server/impl/QueueFactoryImpl.java | 7 ++--
.../artemis/core/server/impl/QueueImpl.java | 21 ++++++++--
.../impl/ScheduledDeliveryHandlerTest.java | 10 +++++
.../tests/integration/jms/RedeployTest.java | 10 +++++
.../management/QueueControlUsingCoreTest.java | 5 +++
.../resources/reload-address-queues-updated.xml | 4 ++
.../test/resources/reload-address-queues.xml | 4 ++
.../unit/core/postoffice/impl/FakeQueue.java | 10 +++++
.../core/server/impl/fakes/FakePostOffice.java | 3 +-
21 files changed, 189 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
index d213446..b210530 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
@@ -225,6 +225,12 @@ public interface QueueControl {
/**
*
*/
+ @Attribute(desc = "is this queue managed by configuration (broker.xml)")
+ boolean isConfigurationManaged();
+
+ /**
+ *
+ */
@Attribute(desc = "If the queue should route exclusively to one consumer")
boolean isExclusive();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 7377846..3db5cae 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -490,6 +490,18 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
@Override
+ public boolean isConfigurationManaged() {
+ checkStarted();
+
+ clearIO();
+ try {
+ return queue.isConfigurationManaged();
+ } finally {
+ blockOnIO();
+ }
+ }
+
+ @Override
public boolean isExclusive() {
checkStarted();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
index ebc86fc..9d7bb7e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
@@ -40,6 +40,10 @@ public interface QueueBindingInfo {
boolean isAutoCreated();
+ boolean isConfigurationManaged();
+
+ void setConfigurationManaged(boolean configurationManaged);
+
SimpleString getUser();
void addQueueStatusEncoding(QueueStatusEncoding status);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 8c3cc77..39511f1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -1290,7 +1290,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
SimpleString filterString = filter == null ? null : filter.getFilterString();
- PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isExclusive(), queue.isLastValue(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.getRoutingType().getType());
+ PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isExclusive(), queue.isLastValue(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.getRoutingType().getType(), queue.isConfigurationManaged());
readLock();
try {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
index 0cfe67c..a7d5216 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
@@ -56,6 +56,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
public byte routingType;
+ public boolean configurationManaged;
+
public PersistentQueueBindingEncoding() {
}
@@ -86,6 +88,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
delayBeforeDispatch +
", routingType=" +
routingType +
+ ", configurationManaged=" +
+ configurationManaged +
"]";
}
@@ -100,7 +104,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
final boolean lastValue,
final int consumersBeforeDispatch,
final long delayBeforeDispatch,
- final byte routingType) {
+ final byte routingType,
+ final boolean configurationManaged) {
this.name = name;
this.address = address;
this.filterString = filterString;
@@ -113,6 +118,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
this.consumersBeforeDispatch = consumersBeforeDispatch;
this.delayBeforeDispatch = delayBeforeDispatch;
this.routingType = routingType;
+ this.configurationManaged = configurationManaged;
}
@Override
@@ -155,6 +161,16 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
}
@Override
+ public boolean isConfigurationManaged() {
+ return configurationManaged;
+ }
+
+ @Override
+ public void setConfigurationManaged(boolean configurationManaged) {
+ this.configurationManaged = configurationManaged;
+ }
+
+ @Override
public void addQueueStatusEncoding(QueueStatusEncoding status) {
if (queueStatusEncodings == null) {
queueStatusEncodings = new LinkedList<>();
@@ -288,6 +304,11 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
} else {
delayBeforeDispatch = ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch();
}
+ if (buffer.readableBytes() > 0) {
+ configurationManaged = buffer.readBoolean();
+ } else {
+ configurationManaged = false;
+ }
}
@Override
@@ -304,6 +325,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
buffer.writeBoolean(lastValue);
buffer.writeInt(consumersBeforeDispatch);
buffer.writeLong(delayBeforeDispatch);
+ buffer.writeBoolean(configurationManaged);
}
@Override
@@ -317,7 +339,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_INT +
- DataConstants.SIZE_LONG;
+ DataConstants.SIZE_LONG +
+ DataConstants.SIZE_BOOLEAN;
}
private SimpleString createMetadata() {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
index d31e33b..6ed91b4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
@@ -73,7 +73,8 @@ public interface PostOffice extends ActiveMQComponent {
Boolean exclusive,
Integer consumersBeforeDispatch,
Long delayBeforeDispatch,
- SimpleString user) throws Exception;
+ SimpleString user,
+ Boolean configurationManaged) throws Exception;
List<Queue> listQueuesForAddress(SimpleString address) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index d4cde18..02abf46 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -471,7 +471,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
Boolean exclusive,
Integer consumersBeforeDispatch,
Long delayBeforeDispatch,
- SimpleString user) throws Exception {
+ SimpleString user,
+ Boolean configurationManaged) throws Exception {
synchronized (addressLock) {
final QueueBinding queueBinding = (QueueBinding) addressManager.getBinding(name);
if (queueBinding == null) {
@@ -527,6 +528,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
changed = true;
queue.setFilter(filter);
}
+ if (configurationManaged != null && !configurationManaged.equals(queue.isConfigurationManaged())) {
+ changed = true;
+ queue.setConfigurationManaged(configurationManaged);
+ }
if (logger.isDebugEnabled()) {
if (user == null && queue.getUser() != null) {
logger.debug("Ignoring updating Queue to a NULL user");
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 63d39c7..a8f1095 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -94,6 +94,10 @@ public interface Queue extends Bindable,CriticalComponent {
void setMaxConsumer(int maxConsumers);
+ boolean isConfigurationManaged();
+
+ void setConfigurationManaged(boolean configurationManaged);
+
void addConsumer(Consumer consumer) throws Exception;
void removeConsumer(Consumer consumer);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
index c83e08a..c79114d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
@@ -42,6 +42,7 @@ public final class QueueConfig {
private final boolean purgeOnNoConsumers;
private final int consumersBeforeDispatch;
private final long delayBeforeDispatch;
+ private final boolean configurationManaged;
public static final class Builder {
@@ -61,6 +62,7 @@ public final class QueueConfig {
private boolean purgeOnNoConsumers;
private int consumersBeforeDispatch;
private long delayBeforeDispatch;
+ private boolean configurationManaged;
private Builder(final long id, final SimpleString name) {
this(id, name, name);
@@ -83,6 +85,7 @@ public final class QueueConfig {
this.purgeOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers();
this.consumersBeforeDispatch = ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch();
this.delayBeforeDispatch = ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch();
+ this.configurationManaged = false;
validateState();
}
@@ -99,6 +102,11 @@ public final class QueueConfig {
}
}
+ public Builder configurationManaged(final boolean configurationManaged) {
+ this.configurationManaged = configurationManaged;
+ return this;
+ }
+
public Builder filter(final Filter filter) {
this.filter = filter;
return this;
@@ -185,7 +193,7 @@ public final class QueueConfig {
} else {
pageSubscription = null;
}
- return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, lastValue, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers);
+ return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, lastValue, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers, configurationManaged);
}
}
@@ -233,7 +241,8 @@ public final class QueueConfig {
final boolean lastValue,
final int consumersBeforeDispatch,
final long delayBeforeDispatch,
- final boolean purgeOnNoConsumers) {
+ final boolean purgeOnNoConsumers,
+ final boolean configurationManaged) {
this.id = id;
this.address = address;
this.name = name;
@@ -250,6 +259,7 @@ public final class QueueConfig {
this.maxConsumers = maxConsumers;
this.consumersBeforeDispatch = consumersBeforeDispatch;
this.delayBeforeDispatch = delayBeforeDispatch;
+ this.configurationManaged = configurationManaged;
}
public long id() {
@@ -316,6 +326,10 @@ public final class QueueConfig {
return delayBeforeDispatch;
}
+ public boolean isConfigurationManaged() {
+ return configurationManaged;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o)
@@ -357,6 +371,8 @@ public final class QueueConfig {
return false;
if (purgeOnNoConsumers != that.purgeOnNoConsumers)
return false;
+ if (configurationManaged != that.configurationManaged)
+ return false;
return user != null ? user.equals(that.user) : that.user == null;
}
@@ -379,6 +395,7 @@ public final class QueueConfig {
result = 31 * result + consumersBeforeDispatch;
result = 31 * result + Long.hashCode(delayBeforeDispatch);
result = 31 * result + (purgeOnNoConsumers ? 1 : 0);
+ result = 31 * result + (configurationManaged ? 1 : 0);
return result;
}
@@ -400,6 +417,7 @@ public final class QueueConfig {
+ ", lastValue=" + lastValue
+ ", consumersBeforeDispatch=" + consumersBeforeDispatch
+ ", delayBeforeDispatch=" + delayBeforeDispatch
- + ", purgeOnNoConsumers=" + purgeOnNoConsumers + '}';
+ + ", purgeOnNoConsumers=" + purgeOnNoConsumers
+ + ", configurationManaged=" + configurationManaged + '}';
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index dcb6e02..b5a49ee 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -2729,7 +2729,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
private List<Queue> listConfiguredQueues(SimpleString address) throws Exception {
- return listQueues(address).stream().filter(queue -> !queue.isAutoCreated() && !queue.isInternalQueue()).collect(Collectors.toList());
+ return listQueues(address).stream().filter(queue -> queue.isConfigurationManaged()).collect(Collectors.toList());
}
private List<Queue> listQueues(SimpleString address) throws Exception {
@@ -2794,7 +2794,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
} else {
// if the address::queue doesn't exist then create it
try {
- createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(), queueName, SimpleString.toSimpleString(config.getFilterString()), SimpleString.toSimpleString(config.getUser()), config.isDurable(), false, false, false, false, maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, isLastValue, consumersBeforeDispatch, delayBeforeDispatch, true);
+ createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(), queueName, SimpleString.toSimpleString(config.getFilterString()), SimpleString.toSimpleString(config.getUser()), config.isDurable(), false, false, false, false, maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, isLastValue, consumersBeforeDispatch, delayBeforeDispatch, true, true);
} catch (ActiveMQQueueExistsException e) {
// the queue may exist on a *different* address
ActiveMQServerLogger.LOGGER.warn(e.getMessage());
@@ -3117,6 +3117,27 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final int consumersBeforeDispatch,
final long delayBeforeDispatch,
final boolean autoCreateAddress) throws Exception {
+ return createQueue(address, routingType, queueName, filterString, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress, false);
+ }
+
+ private Queue createQueue(final SimpleString address,
+ final RoutingType routingType,
+ final SimpleString queueName,
+ final SimpleString filterString,
+ final SimpleString user,
+ final boolean durable,
+ final boolean temporary,
+ final boolean ignoreIfExists,
+ final boolean transientQueue,
+ final boolean autoCreated,
+ final int maxConsumers,
+ final boolean purgeOnNoConsumers,
+ final boolean exclusive,
+ final boolean lastValue,
+ final int consumersBeforeDispatch,
+ final long delayBeforeDispatch,
+ final boolean autoCreateAddress,
+ final boolean configurationManaged) throws Exception {
final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName);
if (binding != null) {
@@ -3170,6 +3191,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
.lastValue(lastValue)
.consumersBeforeDispatch(consumersBeforeDispatch)
.delayBeforeDispatch(delayBeforeDispatch)
+ .configurationManaged(configurationManaged)
.build();
if (hasBrokerQueuePlugins()) {
@@ -3265,8 +3287,21 @@ public class ActiveMQServerImpl implements ActiveMQServer {
Integer consumersBeforeDispatch,
Long delayBeforeDispatch,
String user) throws Exception {
+ return updateQueue(name, routingType, filterString, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, user, null);
+ }
+
+ private Queue updateQueue(String name,
+ RoutingType routingType,
+ String filterString,
+ Integer maxConsumers,
+ Boolean purgeOnNoConsumers,
+ Boolean exclusive,
+ Integer consumersBeforeDispatch,
+ Long delayBeforeDispatch,
+ String user,
+ Boolean configurationManaged) throws Exception {
final Filter filter = FilterImpl.createFilter(filterString);
- final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, filter, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, SimpleString.toSimpleString(user));
+ final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, filter, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, SimpleString.toSimpleString(user), configurationManaged);
if (queueBinding != null) {
final Queue queue = queueBinding.getQueue();
return queue;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 7c2ffee..fd62749 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -66,6 +66,7 @@ public class LastValueQueue extends QueueImpl {
final Integer consumersBeforeDispatch,
final Long delayBeforeDispatch,
final Boolean purgeOnNoConsumers,
+ final boolean configurationManaged,
final ScheduledExecutorService scheduledExecutor,
final PostOffice postOffice,
final StorageManager storageManager,
@@ -73,7 +74,7 @@ public class LastValueQueue extends QueueImpl {
final ArtemisExecutor executor,
final ActiveMQServer server,
final QueueFactory factory) {
- super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
+ super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers, configurationManaged, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index 59a318c..59b1649 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -155,7 +155,8 @@ public class PostOfficeJournalLoader implements JournalLoader {
.lastValue(queueBindingInfo.isLastValue())
.consumersBeforeDispatch(queueBindingInfo.getConsumersBeforeDispatch())
.delayBeforeDispatch(queueBindingInfo.getDelayBeforeDispatch())
- .routingType(RoutingType.getType(queueBindingInfo.getRoutingType()));
+ .routingType(RoutingType.getType(queueBindingInfo.getRoutingType()))
+ .configurationManaged((queueBindingInfo.isConfigurationManaged()));
final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build());
queue.setConsumersRefCount(new QueueManagerImpl(((PostOfficeImpl)postOffice).getServer(), queueBindingInfo.getQueueName()));
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
index 24b36e6..c8835d8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
@@ -74,11 +74,10 @@ public class QueueFactoryImpl implements QueueFactory {
public Queue createQueueWith(final QueueConfig config) {
final Queue queue;
if (config.isLastValue()) {
- queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.consumersBeforeDispatch(), config.delayBeforeDispatch(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
+ queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.consumersBeforeDispatch(), config.delayBeforeDispatch(), config.isPurgeOnNoConsumers(), config.isConfigurationManaged(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
} else {
- queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.consumersBeforeDispatch(), config.delayBeforeDispatch(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
+ queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.consumersBeforeDispatch(), config.delayBeforeDispatch(), config.isPurgeOnNoConsumers(), config.isConfigurationManaged(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
}
-
server.getCriticalAnalyzer().add(queue);
return queue;
}
@@ -102,7 +101,7 @@ public class QueueFactoryImpl implements QueueFactory {
Queue queue;
if (addressSettings.isDefaultLastValueQueue()) {
- queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultExclusive(), ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
+ queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultExclusive(), ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), false, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
} else {
queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 2891350..69d4336 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -279,9 +279,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
public volatile long dispatchStartTime = -1;
- private int consumersBeforeDispatch = 0;
+ private volatile int consumersBeforeDispatch = 0;
- private long delayBeforeDispatch = 0;
+ private volatile long delayBeforeDispatch = 0;
+
+ private volatile boolean configurationManaged;
/**
@@ -429,7 +431,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
final ArtemisExecutor executor,
final ActiveMQServer server,
final QueueFactory factory) {
- this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, null, null, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
+ this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, null, null, purgeOnNoConsumers, false, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
}
public QueueImpl(final long id,
@@ -447,6 +449,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
final Integer consumersBeforeDispatch,
final Long delayBeforeDispatch,
final Boolean purgeOnNoConsumers,
+ final boolean configurationManaged,
final ScheduledExecutorService scheduledExecutor,
final PostOffice postOffice,
final StorageManager storageManager,
@@ -486,6 +489,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
this.delayBeforeDispatch = delayBeforeDispatch == null ? ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch() : delayBeforeDispatch;
+ this.configurationManaged = configurationManaged;
+
this.postOffice = postOffice;
this.storageManager = storageManager;
@@ -663,6 +668,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
@Override
+ public boolean isConfigurationManaged() {
+ return configurationManaged;
+ }
+
+ @Override
+ public synchronized void setConfigurationManaged(boolean configurationManaged) {
+ this.configurationManaged = configurationManaged;
+ }
+
+ @Override
public SimpleString getName() {
return name;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index b21781d..3561e6f 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -834,6 +834,16 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
+ public boolean isConfigurationManaged() {
+ return false;
+ }
+
+ @Override
+ public void setConfigurationManaged(boolean configurationManaged) {
+
+ }
+
+ @Override
public void recheckRefCount(OperationContext context) {
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
index 07bc22e..cec56a0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
@@ -320,8 +320,15 @@ public class RedeployTest extends ActiveMQTestBase {
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
+ try (JMSContext jmsContext = connectionFactory.createContext()) {
+ jmsContext.createSharedDurableConsumer(jmsContext.createTopic("config_test_consumer_created_queues"),"mySub").receive(100);
+ }
+
try {
latch.await(10, TimeUnit.SECONDS);
+ Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_consumer_created_queues").contains("mySub"));
+
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_address_removal_no_queue"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_address_removal"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_queue_removal"));
@@ -344,6 +351,9 @@ public class RedeployTest extends ActiveMQTestBase {
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
latch.await(10, TimeUnit.SECONDS);
+ //Ensure queues created by clients (NOT by broker.xml are not removed when we reload).
+ Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_consumer_created_queues").contains("mySub"));
+
Assert.assertNull(getAddressInfo(embeddedActiveMQ, "config_test_address_removal_no_queue"));
Assert.assertNull(getAddressInfo(embeddedActiveMQ, "config_test_address_removal"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_queue_removal"));
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
index 0a62334..26ada03 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
@@ -147,6 +147,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
}
@Override
+ public boolean isConfigurationManaged() {
+ return (Boolean) proxy.retrieveAttributeValue("configurationManaged");
+ }
+
+ @Override
public boolean isExclusive() {
return (Boolean) proxy.retrieveAttributeValue("exclusive");
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/tests/integration-tests/src/test/resources/reload-address-queues-updated.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/resources/reload-address-queues-updated.xml b/tests/integration-tests/src/test/resources/reload-address-queues-updated.xml
index f8d1d91..fd73f11 100644
--- a/tests/integration-tests/src/test/resources/reload-address-queues-updated.xml
+++ b/tests/integration-tests/src/test/resources/reload-address-queues-updated.xml
@@ -117,6 +117,10 @@ under the License.
</wildcard-addresses>
<addresses>
+ <address name="config_test_consumer_created_queues">
+ <multicast>
+ </multicast>
+ </address>
<address name="config_test_queue_removal">
<multicast>
<queue name="config_test_queue_removal_queue_1"/>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/tests/integration-tests/src/test/resources/reload-address-queues.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/resources/reload-address-queues.xml b/tests/integration-tests/src/test/resources/reload-address-queues.xml
index ebd0f4e..74c9d08 100644
--- a/tests/integration-tests/src/test/resources/reload-address-queues.xml
+++ b/tests/integration-tests/src/test/resources/reload-address-queues.xml
@@ -120,6 +120,10 @@ under the License.
</wildcard-addresses>
<addresses>
+ <address name="config_test_consumer_created_queues">
+ <multicast>
+ </multicast>
+ </address>
<address name="config_test_queue_removal">
<multicast>
<queue name="config_test_queue_removal_queue_1"/>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 5297ab6..7c1297d 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -106,6 +106,16 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
+ public boolean isConfigurationManaged() {
+ return false;
+ }
+
+ @Override
+ public void setConfigurationManaged(boolean configurationManaged) {
+
+ }
+
+ @Override
public boolean isInternalQueue() {
// no-op
return false;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c417d0b5/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
index 0bbe8ef..5f128ea 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
@@ -53,7 +53,8 @@ public class FakePostOffice implements PostOffice {
Boolean exclusive,
Integer consumersBeforeDispatch,
Long delayBeforeDispatch,
- SimpleString user) throws Exception {
+ SimpleString user,
+ Boolean configurationManaged) throws Exception {
return null;
}
[2/2] activemq-artemis git commit: This closes #2300
Posted by cl...@apache.org.
This closes #2300
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/4507d778
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/4507d778
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/4507d778
Branch: refs/heads/master
Commit: 4507d7783f9f609be8b0946adb4d148330d6513e
Parents: 71390fe c417d0b
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Sep 11 13:55:12 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Sep 11 13:55:12 2018 -0400
----------------------------------------------------------------------
.../api/core/management/QueueControl.java | 6 +++
.../core/management/impl/QueueControlImpl.java | 12 ++++++
.../core/persistence/QueueBindingInfo.java | 4 ++
.../journal/AbstractJournalStorageManager.java | 2 +-
.../codec/PersistentQueueBindingEncoding.java | 27 ++++++++++++-
.../artemis/core/postoffice/PostOffice.java | 3 +-
.../core/postoffice/impl/PostOfficeImpl.java | 7 +++-
.../activemq/artemis/core/server/Queue.java | 4 ++
.../artemis/core/server/QueueConfig.java | 24 ++++++++++--
.../core/server/impl/ActiveMQServerImpl.java | 41 ++++++++++++++++++--
.../core/server/impl/LastValueQueue.java | 3 +-
.../server/impl/PostOfficeJournalLoader.java | 3 +-
.../core/server/impl/QueueFactoryImpl.java | 7 ++--
.../artemis/core/server/impl/QueueImpl.java | 21 ++++++++--
.../impl/ScheduledDeliveryHandlerTest.java | 10 +++++
.../tests/integration/jms/RedeployTest.java | 10 +++++
.../management/QueueControlUsingCoreTest.java | 5 +++
.../resources/reload-address-queues-updated.xml | 4 ++
.../test/resources/reload-address-queues.xml | 4 ++
.../unit/core/postoffice/impl/FakeQueue.java | 10 +++++
.../core/server/impl/fakes/FakePostOffice.java | 3 +-
21 files changed, 189 insertions(+), 21 deletions(-)
----------------------------------------------------------------------