You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/11/07 16:36:41 UTC
[21/50] [abbrv] activemq-artemis git commit: Implemented MaxConsumers
DeleteOnNoConsumers for Queues
Implemented MaxConsumers DeleteOnNoConsumers for Queues
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/541e4e0a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/541e4e0a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/541e4e0a
Branch: refs/heads/ARTEMIS-780
Commit: 541e4e0a5178b61d1fba145e7d203824be4223e1
Parents: 47f47e8
Author: Martyn Taylor <mt...@redhat.com>
Authored: Tue Nov 1 10:19:55 2016 +0000
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Nov 7 11:28:07 2016 -0500
----------------------------------------------------------------------
.../artemis/api/core/ActiveMQExceptionType.java | 8 +-
.../ActiveMQQueueMaxConsumerLimitReached.java | 31 ++++++
.../core/ServerSessionPacketHandler.java | 8 ++
.../core/server/ActiveMQMessageBundle.java | 3 +
.../artemis/core/server/ActiveMQServer.java | 26 ++++-
.../activemq/artemis/core/server/Queue.java | 2 +-
.../core/server/impl/ActiveMQServerImpl.java | 50 +++++++---
.../core/server/impl/LastValueQueue.java | 4 +-
.../server/impl/PostOfficeJournalLoader.java | 4 +-
.../core/server/impl/QueueFactoryImpl.java | 6 +-
.../artemis/core/server/impl/QueueImpl.java | 63 ++++++++++++
.../core/server/impl/ServerConsumerImpl.java | 1 +
.../impl/ScheduledDeliveryHandlerTest.java | 10 ++
.../integration/addressing/AddressingTest.java | 100 ++++++++++++++++---
.../integration/client/HangConsumerTest.java | 8 +-
.../unit/core/postoffice/impl/FakeQueue.java | 10 ++
16 files changed, 290 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
index 752574a..0221562 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
@@ -213,7 +213,13 @@ public enum ActiveMQExceptionType {
}
},
- NOT_IMPLEMTNED_EXCEPTION(213);
+ NOT_IMPLEMTNED_EXCEPTION(213),
+ MAX_CONSUMER_LIMIT_EXCEEDED(214) {
+ @Override
+ public ActiveMQException createException(String msg) {
+ return new ActiveMQQueueMaxConsumerLimitReached(msg);
+ }
+ };
private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQQueueMaxConsumerLimitReached.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQQueueMaxConsumerLimitReached.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQQueueMaxConsumerLimitReached.java
new file mode 100644
index 0000000..0577e08
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQQueueMaxConsumerLimitReached.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.api.core;
+
+/**
+ * An operation failed because a queue exists on the server.
+ */
+public final class ActiveMQQueueMaxConsumerLimitReached extends ActiveMQException {
+
+ public ActiveMQQueueMaxConsumerLimitReached() {
+ super(ActiveMQExceptionType.MAX_CONSUMER_LIMIT_EXCEEDED);
+ }
+
+ public ActiveMQQueueMaxConsumerLimitReached(String msg) {
+ super(ActiveMQExceptionType.MAX_CONSUMER_LIMIT_EXCEEDED, msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index b52534c..2a45f29 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
+import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
import org.apache.activemq.artemis.core.exception.ActiveMQXAException;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -494,6 +495,13 @@ public class ServerSessionPacketHandler implements ChannelHandler {
} else {
ActiveMQServerLogger.LOGGER.caughtXaException(e);
}
+ } catch (ActiveMQQueueMaxConsumerLimitReached e) {
+ if (requiresResponse) {
+ logger.debug("Sending exception to client", e);
+ response = new ActiveMQExceptionMessage(e);
+ } else {
+ ActiveMQServerLogger.LOGGER.caughtException(e);
+ }
} catch (ActiveMQException e) {
if (requiresResponse) {
logger.debug("Sending exception to client", e);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index f22873b..769d183 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -32,6 +32,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQInvalidFilterExpressionExcep
import org.apache.activemq.artemis.api.core.ActiveMQInvalidTransientQueueUseException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
+import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.ActiveMQSessionCreationException;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
@@ -378,4 +379,6 @@ public interface ActiveMQMessageBundle {
@Message(id = 119119, value = "Disk Capacity is Low, cannot produce more messages.")
ActiveMQIOErrorException diskBeyondLimit();
+ @Message(id = 119200, value = "Maximum Consumer Limit Reached on Queue:(address={0},queue={1})", format = Message.Format.MESSAGE_FORMAT)
+ ActiveMQQueueMaxConsumerLimitReached maxConsumerLimitReachedForQueue(SimpleString address, SimpleString queueName);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index ed45645..749969a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -301,6 +301,14 @@ public interface ActiveMQServer extends ActiveMQComponent {
Queue createQueue(SimpleString address,
SimpleString queueName,
+ SimpleString filterString,
+ boolean durable,
+ boolean temporary,
+ Integer maxConsumers,
+ Boolean deleteOnNoConsumers) throws Exception;
+
+ Queue createQueue(SimpleString address,
+ SimpleString queueName,
SimpleString filter,
SimpleString user,
boolean durable,
@@ -393,10 +401,22 @@ public interface ActiveMQServer extends ActiveMQComponent {
AddressInfo getAddressInfo(SimpleString address);
+ Queue createQueue(SimpleString addressName,
+ SimpleString queueName,
+ SimpleString filterString,
+ SimpleString user,
+ boolean durable,
+ boolean temporary,
+ boolean ignoreIfExists,
+ boolean transientQueue,
+ boolean autoCreated,
+ Integer maxConsumers,
+ Boolean deleteOnNoConsumers) throws Exception;
+
/*
- * add a ProtocolManagerFactory to be used. Note if @see Configuration#isResolveProtocols is tur then this factory will
- * replace any factories with the same protocol
- * */
+ * add a ProtocolManagerFactory to be used. Note if @see Configuration#isResolveProtocols is tur then this factory will
+ * replace any factories with the same protocol
+ * */
void addProtocolManagerFactory(ProtocolManagerFactory factory);
/*
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 270e0cd..2b845d5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -48,7 +48,7 @@ public interface Queue extends Bindable {
boolean isDeleteOnNoConsumers();
- boolean getMaxConsumers();
+ int getMaxConsumers();
void addConsumer(Consumer consumer) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 9421df3..ba63bb3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1433,6 +1433,17 @@ public class ActiveMQServerImpl implements ActiveMQServer {
public Queue createQueue(final SimpleString address,
final SimpleString queueName,
final SimpleString filterString,
+ final boolean durable,
+ final boolean temporary,
+ final Integer maxConsumers,
+ final Boolean deleteOnNoConsumers) throws Exception {
+ return createQueue(address, queueName, filterString, null, durable, temporary, false, false, false, maxConsumers, deleteOnNoConsumers);
+ }
+
+ @Override
+ public Queue createQueue(final SimpleString address,
+ final SimpleString queueName,
+ final SimpleString filterString,
final SimpleString user,
final boolean durable,
final boolean temporary) throws Exception {
@@ -2261,17 +2272,18 @@ public class ActiveMQServerImpl implements ActiveMQServer {
null);
}
- private Queue createQueue(final SimpleString addressName,
- final SimpleString queueName,
- final SimpleString filterString,
- final SimpleString user,
- final boolean durable,
- final boolean temporary,
- final boolean ignoreIfExists,
- final boolean transientQueue,
- final boolean autoCreated,
- final Integer maxConsumers,
- final Boolean deleteOnNoConsumers) throws Exception {
+ @Override
+ public Queue createQueue(final SimpleString addressName,
+ final SimpleString queueName,
+ final SimpleString filterString,
+ final SimpleString user,
+ final boolean durable,
+ final boolean temporary,
+ final boolean ignoreIfExists,
+ final boolean transientQueue,
+ final boolean autoCreated,
+ final Integer maxConsumers,
+ final Boolean deleteOnNoConsumers) throws Exception {
final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName);
if (binding != null) {
@@ -2297,8 +2309,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
address = addressName;
}
+
+ AddressInfo defaultAddressInfo = new AddressInfo(address);
// FIXME This boils down to a putIfAbsent (avoids race). This should be reflected in the API.
- AddressInfo info = postOffice.addAddressInfo(new AddressInfo(address));
+ AddressInfo info = postOffice.addAddressInfo(defaultAddressInfo);
+
+ boolean addressExists = true;
+ if (info == null) {
+ info = defaultAddressInfo;
+ addressExists = false;
+ }
final boolean isDeleteOnNoConsumers = deleteOnNoConsumers == null ? info.isDefaultDeleteOnNoConsumers() : deleteOnNoConsumers;
final int noMaxConsumers = maxConsumers == null ? info.getDefaultMaxConsumers() : maxConsumers;
@@ -2323,10 +2343,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final QueueBinding localQueueBinding = new LocalQueueBinding(getAddressInfo(queue.getAddress()), queue, nodeManager.getNodeId());
if (queue.isDurable()) {
- storageManager.addQueueBinding(txID, localQueueBinding);
- if (info == null) {
- storageManager.addAddressBinding(txID, getAddressInfo(queue.getAddress()));
+ if (!addressExists) {
+ storageManager.addAddressBinding(txID, getAddressInfo(address));
}
+ storageManager.addQueueBinding(txID, localQueueBinding);
}
try {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 453f588..a4fa5dc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -56,12 +56,14 @@ public class LastValueQueue extends QueueImpl {
final boolean durable,
final boolean temporary,
final boolean autoCreated,
+ final Integer maxConsumers,
+ final Boolean deleteOnNoConsumers,
final ScheduledExecutorService scheduledExecutor,
final PostOffice postOffice,
final StorageManager storageManager,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
final Executor executor) {
- super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
+ super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
new Exception("LastValueQeue " + this).toString();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index 9bd14f0..6f4cf03 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -148,8 +148,8 @@ public class PostOfficeJournalLoader implements JournalLoader {
.durable(true)
.temporary(false)
.autoCreated(queueBindingInfo.isAutoCreated())
- .de
- );
+ .deleteOnNoConsumers(queueBindingInfo.isDeleteOnNoConsumers())
+ .maxConsumers(queueBindingInfo.getMaxConsumers());
final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build());
if (queue.isAutoCreated()) {
queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl) postOffice).getServer().getJMSQueueDeleter(), queueBindingInfo.getQueueName()));
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
index 3678553..bcc7c79 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
@@ -75,9 +75,9 @@ public class QueueFactoryImpl implements QueueFactory {
final AddressSettings addressSettings = addressSettingsRepository.getMatch(config.address().toString());
final Queue queue;
if (addressSettings.isLastValueQueue()) {
- queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+ queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.maxConsumers(), config.isDeleteOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
} else {
- queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+ queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.maxConsumers(), config.isDeleteOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
}
return queue;
}
@@ -101,7 +101,7 @@ public class QueueFactoryImpl implements QueueFactory {
Queue queue;
if (addressSettings.isLastValueQueue()) {
- queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+ queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
} else {
queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index e01c81e..a37bb50 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -53,6 +54,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
import org.apache.activemq.artemis.core.persistence.QueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.postoffice.AddressManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
@@ -238,6 +240,14 @@ public class QueueImpl implements Queue {
private SlowConsumerReaperRunnable slowConsumerReaperRunnable;
+ private int maxConsumers;
+
+ private boolean deleteOnNoConsumers;
+
+ private final AddressInfo addressInfo;
+
+ private final AtomicInteger noConsumers = new AtomicInteger(0);
+
/**
* This is to avoid multi-thread races on calculating direct delivery,
* to guarantee ordering will be always be correct
@@ -334,10 +344,32 @@ public class QueueImpl implements Queue {
final StorageManager storageManager,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
final Executor executor) {
+ this(id, address, name, filter, null, user, durable, temporary, autoCreated, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
+ }
+
+ public QueueImpl(final long id,
+ final SimpleString address,
+ final SimpleString name,
+ final Filter filter,
+ final PageSubscription pageSubscription,
+ final SimpleString user,
+ final boolean durable,
+ final boolean temporary,
+ final boolean autoCreated,
+ final Integer maxConsumers,
+ final Boolean deleteOnNoConsumers,
+ final ScheduledExecutorService scheduledExecutor,
+ final PostOffice postOffice,
+ final StorageManager storageManager,
+ final HierarchicalRepository<AddressSettings> addressSettingsRepository,
+ final Executor executor) {
+
this.id = id;
this.address = address;
+ this.addressInfo = postOffice.getAddressInfo(address);
+
this.name = name;
this.filter = filter;
@@ -350,6 +382,10 @@ public class QueueImpl implements Queue {
this.autoCreated = autoCreated;
+ this.maxConsumers = maxConsumers == null ? addressInfo.getDefaultMaxConsumers() : maxConsumers;
+
+ this.deleteOnNoConsumers = deleteOnNoConsumers == null ? addressInfo.isDefaultDeleteOnNoConsumers() : deleteOnNoConsumers;
+
this.postOffice = postOffice;
this.storageManager = storageManager;
@@ -437,6 +473,16 @@ public class QueueImpl implements Queue {
}
@Override
+ public boolean isDeleteOnNoConsumers() {
+ return deleteOnNoConsumers;
+ }
+
+ @Override
+ public int getMaxConsumers() {
+ return maxConsumers;
+ }
+
+ @Override
public SimpleString getName() {
return name;
}
@@ -709,6 +755,11 @@ public class QueueImpl implements Queue {
}
synchronized (this) {
+
+ if (maxConsumers != -1 && noConsumers.get() >= maxConsumers) {
+ throw ActiveMQMessageBundle.BUNDLE.maxConsumerLimitReachedForQueue(address, name);
+ }
+
flushDeliveriesInTransit();
consumersChanged = true;
@@ -722,6 +773,8 @@ public class QueueImpl implements Queue {
if (refCountForConsumers != null) {
refCountForConsumers.increment();
}
+
+ noConsumers.incrementAndGet();
}
}
@@ -770,6 +823,15 @@ public class QueueImpl implements Queue {
if (refCountForConsumers != null) {
refCountForConsumers.decrement();
}
+
+ if (noConsumers.decrementAndGet() == 0 && deleteOnNoConsumers) {
+ try {
+ deleteQueue();
+ }
+ catch (Exception e) {
+ logger.error("Error deleting queue on no consumers. " + this.toString(), e);
+ }
+ }
}
}
@@ -1361,6 +1423,7 @@ public class QueueImpl implements Queue {
@Override
public void deleteQueue(boolean removeConsumers) throws Exception {
synchronized (this) {
+ if (this.queueDestroyed) return;
this.queueDestroyed = true;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 98a9c84..389b07e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -205,6 +205,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
this.creationTime = System.currentTimeMillis();
+
if (browseOnly) {
browserDeliverer = new BrowserDeliverer(messageQueue.browserIterator());
} else {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 55a287a..11b11ab 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -901,6 +901,16 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
+ public boolean isDeleteOnNoConsumers() {
+ return false;
+ }
+
+ @Override
+ public int getMaxConsumers() {
+ return -1;
+ }
+
+ @Override
public void addConsumer(Consumer consumer) throws Exception {
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
index 03739e9..a21a62b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
@@ -19,8 +19,11 @@ package org.apache.activemq.artemis.tests.integration.addressing;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
@@ -187,8 +190,6 @@ public class AddressingTest extends ActiveMQTestBase {
assertEquals(0, count);
}
-
-
@Test
public void testMulticastRoutingBackwardsCompat() throws Exception {
@@ -222,34 +223,103 @@ public class AddressingTest extends ActiveMQTestBase {
}
}
- @Ignore
@Test
- public void testDeleteQueueOnNoConsumersTrue() {
- fail("Not Implemented");
+ public void testDeleteQueueOnNoConsumersTrue() throws Exception {
+
+ SimpleString address = new SimpleString("test.address");
+ SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
+ // For each address, create 2 Queues with the same address, assert both queues receive message
+ boolean deleteOnNoConsumers = true;
+ Queue q1 = server.createQueue(address, queueName, null, true, false, null, deleteOnNoConsumers);
+
+ ClientSession session = sessionFactory.createSession();
+ session.start();
+
+ ClientConsumer consumer1 = session.createConsumer(q1.getName());
+ consumer1.close();
+
+ assertFalse(server.queueQuery(queueName).isExists());
}
- @Ignore
@Test
- public void testDeleteQueueOnNoConsumersFalse() {
- fail("Not Implemented");
+ public void testDeleteQueueOnNoConsumersFalse() throws Exception {
+ SimpleString address = new SimpleString("test.address");
+ SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
+ // For each address, create 2 Queues with the same address, assert both queues receive message
+ boolean deleteOnNoConsumers = false;
+ Queue q1 = server.createQueue(address, queueName, null, true, false, null, deleteOnNoConsumers);
+
+ ClientSession session = sessionFactory.createSession();
+ session.start();
+
+ ClientConsumer consumer1 = session.createConsumer(q1.getName());
+ consumer1.close();
+
+ assertTrue(server.queueQuery(queueName).isExists());
}
- @Ignore
@Test
- public void testLimitOnMaxConsumers() {
- fail("Not Implemented");
+ public void testLimitOnMaxConsumers() throws Exception {
+ SimpleString address = new SimpleString("test.address");
+ SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
+ // For each address, create 2 Queues with the same address, assert both queues receive message
+ boolean deleteOnNoConsumers = false;
+ Queue q1 = server.createQueue(address, queueName, null, true, false, 0, deleteOnNoConsumers);
+
+ Exception expectedException = null;
+ String expectedMessage = "Maximum Consumer Limit Reached on Queue";
+ try {
+ ClientSession session = sessionFactory.createSession();
+ session.start();
+
+ ClientConsumer consumer1 = session.createConsumer(q1.getName());
+ }
+ catch (ActiveMQQueueMaxConsumerLimitReached e) {
+ expectedException = e;
+ }
+
+ assertNotNull(expectedException);
+ assertTrue(expectedException.getMessage().contains(expectedMessage));
+ assertTrue(expectedException.getMessage().contains(address));
+ assertTrue(expectedException.getMessage().contains(queueName));
}
@Ignore
@Test
- public void testUnlimitedMaxConsumers() {
- fail("Not Implemented");
+ public void testUnlimitedMaxConsumers() throws Exception {
+ int noConsumers = 50;
+ SimpleString address = new SimpleString("test.address");
+ SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
+ // For each address, create 2 Queues with the same address, assert both queues receive message
+ boolean deleteOnNoConsumers = false;
+ Queue q1 = server.createQueue(address, queueName, null, true, false, -1, deleteOnNoConsumers);
+
+ ClientSession session = sessionFactory.createSession();
+ session.start();
+
+ for (int i = 0; i < noConsumers; i++) {
+ session.createConsumer(q1.getName());
+ }
}
@Ignore
@Test
- public void testDefaultMaxConsumersFromAddress() {
- fail("Not Implemented");
+ public void testDefaultMaxConsumersFromAddress() throws Exception {
+ int noConsumers = 50;
+ SimpleString address = new SimpleString("test.address");
+ SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
+ // For each address, create 2 Queues with the same address, assert both queues receive message
+ boolean deleteOnNoConsumers = false;
+ AddressInfo addressInfo = new AddressInfo(address);
+ addressInfo.setDefaultMaxConsumers(0);
+ Queue q1 = server.createQueue(address, queueName, null, true, false, null, deleteOnNoConsumers);
+
+ ClientSession session = sessionFactory.createSession();
+ session.start();
+
+ for (int i = 0; i < noConsumers; i++) {
+ session.createConsumer(q1.getName());
+ }
}
@Ignore
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index 2fd5915..124ece3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -224,12 +224,14 @@ public class HangConsumerTest extends ActiveMQTestBase {
final boolean durable,
final boolean temporary,
final boolean autoCreated,
+ final Integer maxConsumers,
+ final Boolean deleteOnNoConsumers,
final ScheduledExecutorService scheduledExecutor,
final PostOffice postOffice,
final StorageManager storageManager,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
final Executor executor) {
- super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
+ super(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
}
@Override
@@ -256,7 +258,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
@Override
public Queue createQueueWith(final QueueConfig config) {
- queue = new MyQueueWithBlocking(config.id(), config.address(), config.name(), config.filter(), config.user(), config.pageSubscription(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+ queue = new MyQueueWithBlocking(config.id(), config.address(), config.name(), config.filter(), config.user(), config.pageSubscription(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.maxConsumers(), config.isDeleteOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
return queue;
}
@@ -271,7 +273,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
final boolean durable,
final boolean temporary,
final boolean autoCreated) {
- queue = new MyQueueWithBlocking(persistenceID, address, name, filter, user, pageSubscription, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+ queue = new MyQueueWithBlocking(persistenceID, address, name, filter, user, pageSubscription, durable, temporary, autoCreated, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
return queue;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/541e4e0a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 9a20d70..ef5c05e 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -443,6 +443,16 @@ public class FakeQueue implements Queue {
}
@Override
+ public boolean isDeleteOnNoConsumers() {
+ return false;
+ }
+
+ @Override
+ public int getMaxConsumers() {
+ return -1;
+ }
+
+ @Override
public LinkedListIterator<MessageReference> iterator() {
// no-op
return null;