You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/11/07 16:36:43 UTC
[23/50] [abbrv] activemq-artemis git commit: Add Queue Meta Data for
DeleteOnNoConsumers, MaxConsumers
Add Queue Meta Data for DeleteOnNoConsumers,MaxConsumers
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/47f47e85
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/47f47e85
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/47f47e85
Branch: refs/heads/ARTEMIS-780
Commit: 47f47e85b870382effd3f4a47234b94756b094b7
Parents: d70bf3b
Author: Martyn Taylor <mt...@redhat.com>
Authored: Mon Oct 31 13:19:02 2016 +0000
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Nov 7 11:28:07 2016 -0500
----------------------------------------------------------------------
.../impl/ActiveMQServerControlImpl.java | 1 -
.../core/persistence/QueueBindingInfo.java | 7 ++
.../codec/PersistentQueueBindingEncoding.java | 43 ++++++++++-
.../artemis/core/server/ActiveMQServer.java | 9 +++
.../activemq/artemis/core/server/Queue.java | 4 ++
.../artemis/core/server/QueueConfig.java | 51 ++++++++++++-
.../core/server/impl/ActiveMQServerImpl.java | 76 +++++++++++++++++---
.../artemis/core/server/impl/AddressInfo.java | 5 +-
.../server/impl/PostOfficeJournalLoader.java | 8 ++-
9 files changed, 185 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47f47e85/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index b0e8b9b..fcbf15c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -573,7 +573,6 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
SimpleString filter = filterStr == null ? null : new SimpleString(filterStr);
clearIO();
try {
-
server.deployQueue(SimpleString.toSimpleString(address), new SimpleString(name), filter, durable, false);
} finally {
blockOnIO();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47f47e85/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
index 8c80a8a..4d435c6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
@@ -46,4 +46,11 @@ public interface QueueBindingInfo {
List<QueueStatusEncoding> getQueueStatusEncodings();
+ int getMaxConsumers();
+
+ void setMaxConsumers(int maxConsumers);
+
+ boolean isDeleteOnNoConsumers();
+
+ void setDeleteOnNoConsumers();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47f47e85/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
index 039460c..78a81ea 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
@@ -41,6 +41,10 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
public List<QueueStatusEncoding> queueStatusEncodings;
+ public int maxConsumers;
+
+ public boolean deleteOnNoConsumers;
+
public PersistentQueueBindingEncoding() {
}
@@ -57,6 +61,10 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
user +
", autoCreated=" +
autoCreated +
+ ", maxConsumers=" +
+ maxConsumers +
+ ", deleteOnNoConsumers=" +
+ deleteOnNoConsumers +
"]";
}
@@ -125,6 +133,26 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
}
@Override
+ public int getMaxConsumers() {
+ return 0;
+ }
+
+ @Override
+ public void setMaxConsumers(int maxConsumers) {
+
+ }
+
+ @Override
+ public boolean isDeleteOnNoConsumers() {
+ return false;
+ }
+
+ @Override
+ public void setDeleteOnNoConsumers() {
+
+ }
+
+ @Override
public void decode(final ActiveMQBuffer buffer) {
name = buffer.readSimpleString();
address = buffer.readSimpleString();
@@ -144,6 +172,15 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
}
autoCreated = buffer.readBoolean();
+
+ if (buffer.readableBytes() > 0) {
+ maxConsumers = buffer.readInt();
+ deleteOnNoConsumers = buffer.readBoolean();
+ }
+ else {
+ maxConsumers = -1;
+ deleteOnNoConsumers = false;
+ }
}
@Override
@@ -153,13 +190,17 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
buffer.writeNullableSimpleString(filterString);
buffer.writeNullableSimpleString(createMetadata());
buffer.writeBoolean(autoCreated);
+ buffer.writeInt(maxConsumers);
+ buffer.writeBoolean(deleteOnNoConsumers);
}
@Override
public int getEncodeSize() {
return SimpleString.sizeofString(name) + SimpleString.sizeofString(address) +
SimpleString.sizeofNullableString(filterString) + DataConstants.SIZE_BOOLEAN +
- SimpleString.sizeofNullableString(createMetadata());
+ SimpleString.sizeofNullableString(createMetadata()) +
+ DataConstants.SIZE_INT +
+ DataConstants.SIZE_BOOLEAN;
}
private SimpleString createMetadata() {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47f47e85/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index a6256d8..ed45645 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -333,6 +333,15 @@ public interface ActiveMQServer extends ActiveMQComponent {
QueueQueryResult queueQuery(SimpleString name) throws Exception;
+ Queue deployQueue(SimpleString address,
+ SimpleString resourceName,
+ SimpleString filterString,
+ boolean durable,
+ boolean temporary,
+ boolean autoCreated,
+ Integer maxConsumers,
+ Boolean deleteOnNoConsumers) throws Exception;
+
void destroyQueue(SimpleString queueName) throws Exception;
void destroyQueue(SimpleString queueName, SecurityAuth session) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47f47e85/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 52cd2f0..270e0cd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -46,6 +46,10 @@ public interface Queue extends Bindable {
boolean isAutoCreated();
+ boolean isDeleteOnNoConsumers();
+
+ boolean getMaxConsumers();
+
void addConsumer(Consumer consumer) throws Exception;
void removeConsumer(Consumer consumer);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47f47e85/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
index f750f6c..3b7ed71 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
@@ -33,6 +33,8 @@ public final class QueueConfig {
private final boolean durable;
private final boolean temporary;
private final boolean autoCreated;
+ private final int maxConsumers;
+ private final boolean deleteOnNoConsumers;
public static final class Builder {
@@ -45,6 +47,8 @@ public final class QueueConfig {
private boolean durable;
private boolean temporary;
private boolean autoCreated;
+ private int maxConsumers;
+ private boolean deleteOnNoConsumers;
private Builder(final long id, final SimpleString name) {
this(id, name, name);
@@ -60,6 +64,8 @@ public final class QueueConfig {
this.durable = true;
this.temporary = false;
this.autoCreated = true;
+ this.maxConsumers = -1;
+ this.deleteOnNoConsumers = false;
validateState();
}
@@ -106,6 +112,16 @@ public final class QueueConfig {
return this;
}
+ public Builder maxConsumers(final int maxConsumers) {
+ this.maxConsumers = maxConsumers;
+ return this;
+ }
+
+ public Builder deleteOnNoConsumers(final boolean deleteOnNoConsumers) {
+ this.deleteOnNoConsumers = deleteOnNoConsumers;
+ return this;
+ }
+
/**
* Returns a new {@link QueueConfig} using the parameters configured on the {@link Builder}.
* <br>
@@ -127,7 +143,7 @@ public final class QueueConfig {
} else {
pageSubscription = null;
}
- return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated);
+ return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers);
}
}
@@ -168,7 +184,9 @@ public final class QueueConfig {
final SimpleString user,
final boolean durable,
final boolean temporary,
- final boolean autoCreated) {
+ final boolean autoCreated,
+ final int maxConsumers,
+ final boolean deleteOnNoConsumers) {
this.id = id;
this.address = address;
this.name = name;
@@ -178,6 +196,8 @@ public final class QueueConfig {
this.durable = durable;
this.temporary = temporary;
this.autoCreated = autoCreated;
+ this.deleteOnNoConsumers = deleteOnNoConsumers;
+ this.maxConsumers = maxConsumers;
}
public long id() {
@@ -216,6 +236,14 @@ public final class QueueConfig {
return autoCreated;
}
+ public boolean isDeleteOnNoConsumers() {
+ return deleteOnNoConsumers;
+ }
+
+ public int maxConsumers() {
+ return maxConsumers;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o)
@@ -241,6 +269,10 @@ public final class QueueConfig {
return false;
if (pageSubscription != null ? !pageSubscription.equals(that.pageSubscription) : that.pageSubscription != null)
return false;
+ if (maxConsumers != that.maxConsumers)
+ return false;
+ if (deleteOnNoConsumers != that.deleteOnNoConsumers)
+ return false;
return user != null ? user.equals(that.user) : that.user == null;
}
@@ -256,11 +288,24 @@ public final class QueueConfig {
result = 31 * result + (durable ? 1 : 0);
result = 31 * result + (temporary ? 1 : 0);
result = 31 * result + (autoCreated ? 1 : 0);
+ result = 31 * result + maxConsumers;
+ result = 31 * result + (deleteOnNoConsumers ? 1 : 0);
return result;
}
@Override
public String toString() {
- return "QueueConfig{" + "id=" + id + ", address=" + address + ", name=" + name + ", filter=" + filter + ", pageSubscription=" + pageSubscription + ", user=" + user + ", durable=" + durable + ", temporary=" + temporary + ", autoCreated=" + autoCreated + '}';
+ return "QueueConfig{"
+ + "id=" + id
+ + ", address=" + address
+ + ", name=" + name
+ + ", filter=" + filter
+ + ", pageSubscription=" + pageSubscription
+ + ", user=" + user
+ + ", durable=" + durable
+ + ", temporary=" + temporary
+ + ", autoCreated=" + autoCreated
+ + ", maxConsumers=" + maxConsumers
+ + ", deleteOnNoConsumers=" + deleteOnNoConsumers + '}';
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47f47e85/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index cce81c5..9421df3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1509,6 +1509,18 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean durable,
final boolean temporary,
final boolean autoCreated) throws Exception {
+ return deployQueue(address, resourceName, filterString, durable, temporary, autoCreated, null, null);
+ }
+
+ @Override
+ public Queue deployQueue(final SimpleString address,
+ final SimpleString resourceName,
+ final SimpleString filterString,
+ final boolean durable,
+ final boolean temporary,
+ final boolean autoCreated,
+ final Integer maxConsumers,
+ final Boolean deleteOnNoConsumers) throws Exception {
if (resourceName.toString().toLowerCase().startsWith("jms.topic")) {
ActiveMQServerLogger.LOGGER.deployTopic(resourceName);
@@ -1516,7 +1528,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
ActiveMQServerLogger.LOGGER.deployQueue(resourceName);
}
- return createQueue(address, resourceName, filterString, null, durable, temporary, true, false, autoCreated);
+ return createQueue(address, resourceName, filterString, null, durable, temporary, true, false, autoCreated, maxConsumers, deleteOnNoConsumers);
}
@Override
@@ -2102,9 +2114,17 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private void deployQueuesFromListCoreQueueConfiguration(List<CoreQueueConfiguration> queues) throws Exception {
for (CoreQueueConfiguration config : queues) {
- deployQueue(SimpleString.toSimpleString(config.getAddress()), SimpleString.toSimpleString(config.getName()), SimpleString.toSimpleString(config.getFilterString()), config.isDurable(), false);
+ deployQueue(SimpleString.toSimpleString(config.getAddress()),
+ SimpleString.toSimpleString(config.getName()),
+ SimpleString.toSimpleString(config.getFilterString()),
+ config.isDurable(),
+ false,
+ false,
+ config.getMaxConsumers(),
+ config.getDeleteOnNoConsumers());
}
}
+
private void deployQueuesFromConfiguration() throws Exception {
deployQueuesFromListCoreQueueConfiguration(configuration.getQueueConfigurations());
}
@@ -2228,6 +2248,30 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean ignoreIfExists,
final boolean transientQueue,
final boolean autoCreated) throws Exception {
+ return createQueue(addressName,
+ queueName,
+ filterString,
+ user,
+ durable,
+ temporary,
+ ignoreIfExists,
+ transientQueue,
+ autoCreated,
+ null,
+ null);
+ }
+
+ private Queue createQueue(final SimpleString addressName,
+ final SimpleString queueName,
+ final SimpleString filterString,
+ final SimpleString user,
+ final boolean durable,
+ final boolean temporary,
+ final boolean ignoreIfExists,
+ final boolean transientQueue,
+ final boolean autoCreated,
+ final Integer maxConsumers,
+ final Boolean deleteOnNoConsumers) throws Exception {
final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName);
if (binding != null) {
@@ -2244,21 +2288,31 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final long queueID = storageManager.generateID();
final QueueConfig.Builder queueConfigBuilder;
+ final SimpleString address;
if (addressName == null) {
queueConfigBuilder = QueueConfig.builderWith(queueID, queueName);
+ address = queueName;
} else {
queueConfigBuilder = QueueConfig.builderWith(queueID, queueName, addressName);
+ address = addressName;
}
- final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).build();
- final Queue queue = queueFactory.createQueueWith(queueConfig);
- boolean addressAlreadyExists = true;
+ // FIXME This boils down to a putIfAbsent (avoids race). This should be reflected in the API.
+ AddressInfo info = postOffice.addAddressInfo(new AddressInfo(address));
- if (postOffice.getAddressInfo(queue.getAddress()) == null) {
- postOffice.addAddressInfo(new AddressInfo(queue.getAddress())
- .setRoutingType(AddressInfo.RoutingType.MULTICAST));
- addressAlreadyExists = false;
- }
+ final boolean isDeleteOnNoConsumers = deleteOnNoConsumers == null ? info.isDefaultDeleteOnNoConsumers() : deleteOnNoConsumers;
+ final int noMaxConsumers = maxConsumers == null ? info.getDefaultMaxConsumers() : maxConsumers;
+
+ final QueueConfig queueConfig = queueConfigBuilder.filter(filter)
+ .pagingManager(pagingManager)
+ .user(user)
+ .durable(durable)
+ .temporary(temporary)
+ .autoCreated(autoCreated)
+ .deleteOnNoConsumers(isDeleteOnNoConsumers)
+ .maxConsumers(noMaxConsumers)
+ .build();
+ final Queue queue = queueFactory.createQueueWith(queueConfig);
if (transientQueue) {
queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName()));
@@ -2270,7 +2324,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (queue.isDurable()) {
storageManager.addQueueBinding(txID, localQueueBinding);
- if (!addressAlreadyExists) {
+ if (info == null) {
storageManager.addAddressBinding(txID, getAddressInfo(queue.getAddress()));
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47f47e85/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
index 4e982c4..7c71c1f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.server.impl;
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
public class AddressInfo {
@@ -24,9 +25,9 @@ public class AddressInfo {
private RoutingType routingType = RoutingType.MULTICAST;
- private boolean defaultDeleteOnNoConsumers;
+ private boolean defaultDeleteOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers();
- private int defaultMaxConsumers;
+ private int defaultMaxConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers();
public AddressInfo(SimpleString name) {
this.name = name;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47f47e85/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index 4e89e8a..9bd14f0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -143,7 +143,13 @@ public class PostOfficeJournalLoader implements JournalLoader {
} else {
queueConfigBuilder = QueueConfig.builderWith(queueBindingInfo.getId(), queueBindingInfo.getQueueName(), queueBindingInfo.getAddress());
}
- queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(queueBindingInfo.getUser()).durable(true).temporary(false).autoCreated(queueBindingInfo.isAutoCreated());
+ queueConfigBuilder.filter(filter).pagingManager(pagingManager)
+ .user(queueBindingInfo.getUser())
+ .durable(true)
+ .temporary(false)
+ .autoCreated(queueBindingInfo.isAutoCreated())
+ .de
+ );
final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build());
if (queue.isAutoCreated()) {
queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl) postOffice).getServer().getJMSQueueDeleter(), queueBindingInfo.getQueueName()));