You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/09/22 14:45:56 UTC
[2/2] activemq-artemis git commit: ARTEMIS-743 Created QueueConfig
that replace and enable additional behaviours on QueueFactory. Added Filter
predicate.
ARTEMIS-743 Created QueueConfig that replace and enable additional behaviours on QueueFactory.
Added Filter predicate.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c002cf13
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c002cf13
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c002cf13
Branch: refs/heads/master
Commit: c002cf13b84308549db99c07918bf1075a5b75be
Parents: e790c78
Author: Francesco Nigro <fn...@redhat.com>
Authored: Tue Sep 20 22:39:28 2016 +0200
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Thu Sep 22 15:45:14 2016 +0100
----------------------------------------------------------------------
.../core/management/ActiveMQServerControl.java | 14 +-
.../activemq/artemis/core/filter/Filter.java | 10 +
.../artemis/core/filter/FilterUtils.java | 36 +++
.../impl/ActiveMQServerControlImpl.java | 10 +-
.../artemis/core/server/ActiveMQServer.java | 16 +-
.../artemis/core/server/QueueConfig.java | 270 +++++++++++++++++++
.../artemis/core/server/QueueFactory.java | 8 +-
.../core/server/impl/ActiveMQServerImpl.java | 124 +++++----
.../server/impl/PostOfficeJournalLoader.java | 48 ++--
.../core/server/impl/QueueFactoryImpl.java | 15 ++
.../resources/schema/artemis-configuration.xsd | 116 ++++----
.../artemis/core/server/QueueConfigTest.java | 46 ++++
.../integration/client/HangConsumerTest.java | 18 +-
.../client/InterruptedLargeMessageTest.java | 23 +-
.../core/server/impl/QueueConcurrentTest.java | 4 +-
.../server/impl/fakes/FakeQueueFactory.java | 9 +-
16 files changed, 597 insertions(+), 170 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
index ab78ef9..bb55d19 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
@@ -408,6 +408,8 @@ public interface ActiveMQServerControl {
/**
* Create a durable queue.
* <br>
+ * If {@code address} is {@code null} it will be defaulted to {@code name}.
+ * <br>
* This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
*
* @param address address to bind the queue to
@@ -420,6 +422,8 @@ public interface ActiveMQServerControl {
/**
* Create a queue.
* <br>
+ * If {@code address} is {@code null} it will be defaulted to {@code name}.
+ * <br>
* This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
*
* @param address address to bind the queue to
@@ -436,6 +440,8 @@ public interface ActiveMQServerControl {
/**
* Create a queue.
* <br>
+ * If {@code address} is {@code null} it will be defaulted to {@code name}.
+ * <br>
* This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
*
* @param address address to bind the queue to
@@ -450,6 +456,8 @@ public interface ActiveMQServerControl {
/**
* Deploy a durable queue.
* <br>
+ * If {@code address} is {@code null} it will be defaulted to {@code name}.
+ * <br>
* This method will do nothing if the queue with the given name already exists on the server.
*
* @param address address to bind the queue to
@@ -464,6 +472,8 @@ public interface ActiveMQServerControl {
/**
* Deploy a queue.
* <br>
+ * If {@code address} is {@code null} it will be defaulted to {@code name}.
+ * <br>
* This method will do nothing if the queue with the given name already exists on the server.
*
* @param address address to bind the queue to
@@ -645,7 +655,7 @@ public interface ActiveMQServerControl {
/**
* Lists all the consumers connected to this server.
* The returned String is a JSON string containing details about each consumer, e.g.:
- *<pre>
+ * <pre>
* [
* {
* "queueName": "fa87c64c-0a38-4697-8421-72e34d17429d",
@@ -744,7 +754,7 @@ public interface ActiveMQServerControl {
@Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy,
@Parameter(desc = "allow queues to be created automatically", name = "autoCreateJmsQueues") boolean autoCreateJmsQueues,
@Parameter(desc = "allow auto-created queues to be deleted automatically", name = "autoDeleteJmsQueues") boolean autoDeleteJmsQueues,
- @Parameter(desc = "allow topics to be created automatically", name = "autoCreateJmsTopics") boolean autoCreateJmsTopics,
+ @Parameter(desc = "allow topics to be created automatically", name = "autoCreateJmsTopics") boolean autoCreateJmsTopics,
@Parameter(desc = "allow auto-created topics to be deleted automatically", name = "autoDeleteJmsTopics") boolean autoDeleteJmsTopics) throws Exception;
void removeAddressSettings(String addressMatch) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java
index 5dd507c..41d5e54 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java
@@ -21,6 +21,16 @@ import org.apache.activemq.artemis.core.server.ServerMessage;
public interface Filter {
+ /**
+ * JMS Topics (which are outside of the scope of the core API) will require a dumb subscription
+ * with a dummy-filter at this current version as a way to keep its existence valid and TCK
+ * tests. That subscription needs an invalid filter, however paging needs to ignore any
+ * subscription with this filter. For that reason, this filter needs to be rejected on paging or
+ * any other component on the system, and just be ignored for any purpose It's declared here as
+ * this filter is considered a global ignore
+ */
+ String GENERIC_IGNORED_FILTER = "__AMQX=-1";
+
boolean match(ServerMessage message);
SimpleString getFilterString();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/FilterUtils.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/FilterUtils.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/FilterUtils.java
new file mode 100644
index 0000000..c5b1a0a
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/FilterUtils.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.filter;
+
+public final class FilterUtils {
+
+ private FilterUtils() {
+
+ }
+
+ /**
+ * Returns {@code true} if {@code filter} is a {@link org.apache.activemq.artemis.core.filter.Filter#GENERIC_IGNORED_FILTER}.
+ *
+ * @param filter a subscription filter
+ * @return {@code true} if {@code filter} is not {@code null} and is a {@link org.apache.activemq.artemis.core.filter.Filter#GENERIC_IGNORED_FILTER}
+ */
+ public static boolean isTopicIdentification(final Filter filter) {
+ return filter != null && filter.getFilterString() != null && filter.getFilterString().toString().equals(Filter.GENERIC_IGNORED_FILTER);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 6aabbe3..362b74a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -551,7 +551,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
clearIO();
try {
- server.deployQueue(new SimpleString(address), new SimpleString(name), new SimpleString(filterString), true, false);
+ server.deployQueue(SimpleString.toSimpleString(address), new SimpleString(name), new SimpleString(filterString), true, false);
}
finally {
blockOnIO();
@@ -569,7 +569,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
clearIO();
try {
- server.deployQueue(new SimpleString(address), new SimpleString(name), filter, durable, false);
+ server.deployQueue(SimpleString.toSimpleString(address), new SimpleString(name), filter, durable, false);
}
finally {
blockOnIO();
@@ -582,7 +582,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
clearIO();
try {
- server.createQueue(new SimpleString(address), new SimpleString(name), null, true, false);
+ server.createQueue(SimpleString.toSimpleString(address), new SimpleString(name), null, true, false);
}
finally {
blockOnIO();
@@ -595,7 +595,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
clearIO();
try {
- server.createQueue(new SimpleString(address), new SimpleString(name), null, durable, false);
+ server.createQueue(SimpleString.toSimpleString(address), new SimpleString(name), null, durable, false);
}
finally {
blockOnIO();
@@ -616,7 +616,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
filter = new SimpleString(filterStr);
}
- server.createQueue(new SimpleString(address), new SimpleString(name), filter, durable, false);
+ server.createQueue(SimpleString.toSimpleString(address), new SimpleString(name), filter, durable, false);
}
finally {
blockOnIO();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index ac65335..588c17c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -95,10 +95,14 @@ public interface ActiveMQServer extends ActiveMQComponent {
NodeManager getNodeManager();
- /** it will release hold a lock for the activation. */
+ /**
+ * it will release hold a lock for the activation.
+ */
void unlockActivation();
- /** it will hold a lock for the activation. This will prevent the activation from happening. */
+ /**
+ * it will hold a lock for the activation. This will prevent the activation from happening.
+ */
void lockActivation();
/**
@@ -266,15 +270,17 @@ public interface ActiveMQServer extends ActiveMQComponent {
boolean waitForActivation(long timeout, TimeUnit unit) throws InterruptedException;
/**
- * Creates a shared queue. if non durable it will exist as long as there are consumers.
- *
+ * Creates a transient queue. A queue that will exist as long as there are consumers.
+ * The queue will be deleted as soon as all the consumers are removed.
+ * <p>
* Notice: the queue won't be deleted until the first consumer arrives.
*
* @param address
* @param name
* @param filterString
* @param durable
- * @throws Exception
+ * @throws ActiveMQInvalidTransientQueueUseException if the shared queue already exists with a different {@code address} or {@code filter}
+ * @throws NullPointerException if {@code address} is {@code null}
*/
void createSharedQueue(final SimpleString address,
final SimpleString name,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
new file mode 100644
index 0000000..64df0da
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.filter.Filter;
+import org.apache.activemq.artemis.core.filter.FilterUtils;
+import org.apache.activemq.artemis.core.paging.PagingManager;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+
+public final class QueueConfig {
+
+ private final long id;
+ private final SimpleString address;
+ private final SimpleString name;
+ private final Filter filter;
+ private final PageSubscription pageSubscription;
+ private final SimpleString user;
+ private final boolean durable;
+ private final boolean temporary;
+ private final boolean autoCreated;
+
+ public static final class Builder {
+
+ private final long id;
+ private final SimpleString address;
+ private final SimpleString name;
+ private Filter filter;
+ private PagingManager pagingManager;
+ private SimpleString user;
+ private boolean durable;
+ private boolean temporary;
+ private boolean autoCreated;
+
+ private Builder(final long id, final SimpleString name) {
+ this(id, name, name);
+ }
+
+ private Builder(final long id, final SimpleString name, final SimpleString address) {
+ this.id = id;
+ this.name = name;
+ this.address = address;
+ this.filter = null;
+ this.pagingManager = null;
+ this.user = null;
+ this.durable = true;
+ this.temporary = false;
+ this.autoCreated = true;
+ validateState();
+ }
+
+ private static boolean isEmptyOrNull(SimpleString value) {
+ return (value == null || value.length() == 0);
+ }
+
+ private void validateState() {
+ if (isEmptyOrNull(this.name)) {
+ throw new IllegalStateException("name can't be null!");
+ }
+ if (isEmptyOrNull(this.address)) {
+ throw new IllegalStateException("address can't be null!");
+ }
+ }
+
+ public Builder filter(final Filter filter) {
+ this.filter = filter;
+ return this;
+ }
+
+
+ public Builder pagingManager(final PagingManager pagingManager) {
+ this.pagingManager = pagingManager;
+ return this;
+ }
+
+ public Builder user(final SimpleString user) {
+ this.user = user;
+ return this;
+ }
+
+ public Builder durable(final boolean durable) {
+ this.durable = durable;
+ return this;
+ }
+
+ public Builder temporary(final boolean temporary) {
+ this.temporary = temporary;
+ return this;
+ }
+
+ public Builder autoCreated(final boolean autoCreated) {
+ this.autoCreated = autoCreated;
+ return this;
+ }
+
+
+ /**
+ * Returns a new {@link QueueConfig} using the parameters configured on the {@link Builder}.
+ * <br>
+ * The reference parameters aren't defensively copied from the {@link Builder} to the {@link QueueConfig}.
+ * <br>
+ * This method creates a new {@link PageSubscription} only if {@link #pagingManager} is not {@code null} and
+ * if {@link FilterUtils#isTopicIdentification} returns {@code false} on {@link #filter}.
+ *
+ * @throws IllegalStateException if the creation of {@link PageSubscription} fails
+ */
+ public QueueConfig build() {
+ final PageSubscription pageSubscription;
+ if (pagingManager != null && !FilterUtils.isTopicIdentification(filter)) {
+ try {
+ pageSubscription = this.pagingManager.getPageStore(address).getCursorProvider().createSubscription(id, filter, durable);
+ }
+ catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ else {
+ pageSubscription = null;
+ }
+ return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated);
+ }
+
+ }
+
+ /**
+ * Returns a new {@link Builder} of a durable, not temporary and autoCreated {@link QueueConfig} with the given {@code id} and {@code name}.
+ * <br>
+ * The {@code address} is defaulted to the {@code name} value.
+ * The reference parameters aren't defensively copied.
+ *
+ * @param id the id of the queue to be created
+ * @param name the name of the queue to be created
+ * @throws IllegalStateException if {@code name} is {@code null} or empty
+ */
+ public static Builder builderWith(final long id, final SimpleString name) {
+ return new QueueConfig.Builder(id, name);
+ }
+
+ /**
+ * Returns a new {@link Builder} of a durable, not temporary and autoCreated {@link QueueConfig} with the given {@code id}, {@code name} and {@code address}.
+ * <br>
+ * The reference parameters aren't defensively copied.
+ *
+ * @param id the id of the queue to be created
+ * @param name the name of the queue to be created
+ * @param address the address of the queue to be created
+ * @throws IllegalStateException if {@code name} or {@code address} are {@code null} or empty
+ */
+ public static Builder builderWith(final long id, final SimpleString name, final SimpleString address) {
+ return new QueueConfig.Builder(id, name, address);
+ }
+
+ private QueueConfig(final long id,
+ final SimpleString address,
+ final SimpleString name,
+ final Filter filter,
+ final PageSubscription pageSubscription,
+ final SimpleString user,
+ final boolean durable,
+ final boolean temporary,
+ final boolean autoCreated) {
+ this.id = id;
+ this.address = address;
+ this.name = name;
+ this.filter = filter;
+ this.pageSubscription = pageSubscription;
+ this.user = user;
+ this.durable = durable;
+ this.temporary = temporary;
+ this.autoCreated = autoCreated;
+ }
+
+ public long id() {
+ return id;
+ }
+
+ public SimpleString address() {
+ return address;
+ }
+
+ public SimpleString name() {
+ return name;
+ }
+
+ public Filter filter() {
+ return filter;
+ }
+
+ public PageSubscription pageSubscription() {
+ return pageSubscription;
+ }
+
+ public SimpleString user() {
+ return user;
+ }
+
+ public boolean isDurable() {
+ return durable;
+ }
+
+ public boolean isTemporary() {
+ return temporary;
+ }
+
+ public boolean isAutoCreated() {
+ return autoCreated;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ QueueConfig that = (QueueConfig) o;
+
+ if (id != that.id)
+ return false;
+ if (durable != that.durable)
+ return false;
+ if (temporary != that.temporary)
+ return false;
+ if (autoCreated != that.autoCreated)
+ return false;
+ if (address != null ? !address.equals(that.address) : that.address != null)
+ return false;
+ if (name != null ? !name.equals(that.name) : that.name != null)
+ return false;
+ if (filter != null ? !filter.equals(that.filter) : that.filter != null)
+ return false;
+ if (pageSubscription != null ? !pageSubscription.equals(that.pageSubscription) : that.pageSubscription != null)
+ return false;
+ return user != null ? user.equals(that.user) : that.user == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = (int) (id ^ (id >>> 32));
+ result = 31 * result + (address != null ? address.hashCode() : 0);
+ result = 31 * result + (name != null ? name.hashCode() : 0);
+ result = 31 * result + (filter != null ? filter.hashCode() : 0);
+ result = 31 * result + (pageSubscription != null ? pageSubscription.hashCode() : 0);
+ result = 31 * result + (user != null ? user.hashCode() : 0);
+ result = 31 * result + (durable ? 1 : 0);
+ result = 31 * result + (temporary ? 1 : 0);
+ result = 31 * result + (autoCreated ? 1 : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "QueueConfig{" + "id=" + id + ", address=" + address + ", name=" + name + ", filter=" + filter + ", pageSubscription=" + pageSubscription + ", user=" + user + ", durable=" + durable + ", temporary=" + temporary + ", autoCreated=" + autoCreated + '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java
index 5e9f9f1..64e7a5d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java
@@ -23,12 +23,18 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
/**
* A QueueFactory
- *
+ * <p>
* Implementations of this class know how to create queues with the correct attribute values
* based on default and overrides
*/
public interface QueueFactory {
+ Queue createQueueWith(final QueueConfig config);
+
+ /**
+ * @deprecated Replaced by {@link #createQueueWith}
+ */
+ @Deprecated
Queue createQueue(long persistenceID,
final SimpleString address,
SimpleString name,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 38005ed..9bf084d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -113,6 +113,7 @@ import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.PostQueueCreationCallback;
import org.apache.activemq.artemis.core.server.PostQueueDeletionCallback;
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.QueueCreator;
import org.apache.activemq.artemis.core.server.QueueDeleter;
import org.apache.activemq.artemis.core.server.QueueFactory;
@@ -172,8 +173,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
* subscription with this filter. For that reason, this filter needs to be rejected on paging or
* any other component on the system, and just be ignored for any purpose It's declared here as
* this filter is considered a global ignore
+ *
+ * @deprecated Replaced by {@link org.apache.activemq.artemis.core.filter.Filter#GENERIC_IGNORED_FILTER}
*/
- public static final String GENERIC_IGNORED_FILTER = "__AMQX=-1";
+ @Deprecated
+ public static final String GENERIC_IGNORED_FILTER = Filter.GENERIC_IGNORED_FILTER;
private HAPolicy haPolicy;
@@ -184,22 +188,19 @@ public class ActiveMQServerImpl implements ActiveMQServer {
* {@link SERVER_STATE#STOPPED}, so that methods testing for these two values such as
* {@link #stop(boolean)} worked as intended.
*/
- STARTING,
- /**
+ STARTING, /**
* server is started. {@code server.isStarted()} returns {@code true}, and all assumptions
* about it hold.
*/
- STARTED,
- /**
+ STARTED, /**
* stop() was called but has not finished yet. Meant to avoids starting components while
* stop() is executing.
*/
- STOPPING,
- /**
+ STOPPING, /**
* Stopped: either stop() has been called and has finished running, or start() has never been
* called.
*/
- STOPPED;
+ STOPPED
}
private volatile SERVER_STATE state = SERVER_STATE.STOPPED;
@@ -1290,10 +1291,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
SessionCallback callback,
OperationContext context,
boolean autoCreateJMSQueues) throws Exception {
- return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(),
- xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(),
- defaultAddress == null ? null : new SimpleString(defaultAddress), callback, context, autoCreateJMSQueues ? jmsQueueCreator : null,
- pagingManager);
+ return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends,
+ autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), xa,
+ connection, storageManager, postOffice, resourceManager, securityStore, managementService,
+ this, configuration.getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress),
+ callback, context, autoCreateJMSQueues ? jmsQueueCreator : null, pagingManager);
}
@Override
@@ -1370,7 +1372,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
for (Binding binding : postOffice.getAllBindings().values()) {
if (binding.getType() == BindingType.LOCAL_QUEUE) {
- total += ((LocalQueueBinding)binding).getQueue().getMessageCount();
+ total += ((LocalQueueBinding) binding).getQueue().getMessageCount();
}
}
@@ -1383,7 +1385,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
for (Binding binding : postOffice.getAllBindings().values()) {
if (binding.getType() == BindingType.LOCAL_QUEUE) {
- total += ((LocalQueueBinding)binding).getQueue().getMessagesAdded();
+ total += ((LocalQueueBinding) binding).getQueue().getMessagesAdded();
}
}
@@ -1396,7 +1398,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
for (Binding binding : postOffice.getAllBindings().values()) {
if (binding.getType() == BindingType.LOCAL_QUEUE) {
- total += ((LocalQueueBinding)binding).getQueue().getMessagesAcknowledged();
+ total += ((LocalQueueBinding) binding).getQueue().getMessagesAcknowledged();
}
}
@@ -1409,7 +1411,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
for (Binding binding : postOffice.getAllBindings().values()) {
if (binding.getType() == BindingType.LOCAL_QUEUE) {
- total += ((LocalQueueBinding)binding).getQueue().getConsumerCount();
+ total += ((LocalQueueBinding) binding).getQueue().getConsumerCount();
}
}
@@ -1461,25 +1463,17 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return createQueue(address, queueName, filterString, user, durable, temporary, false, false, autoCreated);
}
- /**
- * Creates a transient queue. A queue that will exist as long as there are consumers.
- * The queue will be deleted as soon as all the consumers are removed.
- * <p>
- * Notice: the queue won't be deleted until the first consumer arrives.
- *
- * @param address
- * @param name
- * @param filterString
- * @param durable
- * @throws Exception
- */
@Override
public void createSharedQueue(final SimpleString address,
final SimpleString name,
final SimpleString filterString,
final SimpleString user,
boolean durable) throws Exception {
- Queue queue = createQueue(address, name, filterString, user, durable, !durable, true, !durable, false);
+ //force the old contract about address
+ if (address == null) {
+ throw new NullPointerException("address can't be null!");
+ }
+ final Queue queue = createQueue(address, name, filterString, user, durable, !durable, true, !durable, false);
if (!queue.getAddress().equals(address)) {
throw ActiveMQMessageBundle.BUNDLE.queueSubscriptionBelongsToDifferentAddress(name);
@@ -1490,8 +1484,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
if (logger.isDebugEnabled()) {
- logger.debug("Transient Queue " + name + " created on address " + name +
- " with filter=" + filterString);
+ logger.debug("Transient Queue " + name + " created on address " + name + " with filter=" + filterString);
}
}
@@ -1653,7 +1646,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
@Override
- public void callPostQueueDeletionCallbacks(final SimpleString address, final SimpleString queueName) throws Exception {
+ public void callPostQueueDeletionCallbacks(final SimpleString address,
+ final SimpleString queueName) throws Exception {
for (PostQueueDeletionCallback callback : postQueueDeletionCallbacks) {
callback.callback(address, queueName);
}
@@ -1933,8 +1927,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
storageManager = createStorageManager();
- if (configuration.getClusterConfigurations().size() > 0 &&
- ActiveMQDefaultConfiguration.getDefaultClusterUser().equals(configuration.getClusterUser()) && ActiveMQDefaultConfiguration.getDefaultClusterPassword().equals(configuration.getClusterPassword())) {
+ if (configuration.getClusterConfigurations().size() > 0 && ActiveMQDefaultConfiguration.getDefaultClusterUser().equals(configuration.getClusterUser()) && ActiveMQDefaultConfiguration.getDefaultClusterPassword().equals(configuration.getClusterPassword())) {
ActiveMQServerLogger.LOGGER.clusterSecurityRisk();
}
@@ -1984,7 +1977,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
reloadManager.addCallback(configuration.getConfigurationUrl(), new ConfigurationFileReloader());
}
-
return true;
}
@@ -2066,7 +2058,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
- /** This method exists for a possibility of test cases replacing the FileStoreMonitor for an extension that would for instance pretend a disk full on certain tests. */
+ /**
+ * This method exists for a possibility of test cases replacing the FileStoreMonitor for an extension that would for instance pretend a disk full on certain tests.
+ */
public void injectMonitor(FileStoreMonitor storeMonitor) throws Exception {
this.fileStoreMonitor = storeMonitor;
pagingManager.injectMonitor(storeMonitor);
@@ -2109,7 +2103,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
addressCount++;
}
-
long maxMemory = Runtime.getRuntime().maxMemory();
if (totalMaxSizeBytes >= maxMemory && configuration.getGlobalMaxSize() < 0) {
ActiveMQServerLogger.LOGGER.potentialOOME(addressCount, totalMaxSizeBytes, maxMemory);
@@ -2201,8 +2194,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean ignoreIfExists,
final boolean transientQueue,
final boolean autoCreated) throws Exception {
- QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName);
-
+ final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName);
if (binding != null) {
if (ignoreIfExists) {
return binding.getQueue();
@@ -2212,38 +2204,37 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
- Filter filter = FilterImpl.createFilter(filterString);
-
- long txID = storageManager.generateID();
- long queueID = storageManager.generateID();
+ final Filter filter = FilterImpl.createFilter(filterString);
- PageSubscription pageSubscription;
+ final long txID = storageManager.generateID();
+ final long queueID = storageManager.generateID();
- if (filterString != null && filterString.toString().equals(GENERIC_IGNORED_FILTER)) {
- pageSubscription = null;
+ final QueueConfig.Builder queueConfigBuilder;
+ if (address == null) {
+ queueConfigBuilder = QueueConfig.builderWith(queueID, queueName);
}
else {
- pageSubscription = pagingManager.getPageStore(address).getCursorProvider().createSubscription(queueID, filter, durable);
- }
-
- final Queue queue = queueFactory.createQueue(queueID, address, queueName, filter, pageSubscription, user, durable, temporary, autoCreated);
+ queueConfigBuilder = QueueConfig.builderWith(queueID, queueName, address);
+ }
+ final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).build();
+ final Queue queue = queueFactory.createQueueWith(queueConfig);
if (transientQueue) {
- queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queueName));
+ queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName()));
}
- else if (autoCreated) {
- queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this.getJMSQueueDeleter(), queueName));
+ else if (queue.isAutoCreated()) {
+ queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this.getJMSQueueDeleter(), queue.getName()));
}
- binding = new LocalQueueBinding(address, queue, nodeManager.getNodeId());
+ final QueueBinding localQueueBinding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId());
- if (durable) {
- storageManager.addQueueBinding(txID, binding);
+ if (queue.isDurable()) {
+ storageManager.addQueueBinding(txID, localQueueBinding);
}
try {
- postOffice.addBinding(binding);
- if (durable) {
+ postOffice.addBinding(localQueueBinding);
+ if (queue.isDurable()) {
storageManager.commitBindings(txID);
}
}
@@ -2252,11 +2243,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (durable) {
storageManager.rollbackBindings(txID);
}
- if (queue != null) {
+ final PageSubscription pageSubscription = queue.getPageSubscription();
+ try {
queue.close();
}
- if (pageSubscription != null) {
- pageSubscription.destroy();
+ finally {
+ if (pageSubscription != null) {
+ pageSubscription.destroy();
+ }
}
}
catch (Throwable ignored) {
@@ -2265,10 +2259,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
throw e;
}
- managementService.registerAddress(address);
- managementService.registerQueue(queue, address, storageManager);
+ managementService.registerAddress(queue.getAddress());
+ managementService.registerQueue(queue, queue.getAddress(), storageManager);
- callPostQueueCreationCallbacks(queueName);
+ callPostQueueCreationCallbacks(queue.getName());
return queue;
}
@@ -2423,6 +2417,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
private final class ActivationThread extends Thread {
+
final Runnable runnable;
ActivationThread(Runnable runnable, String name) {
@@ -2444,6 +2439,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
private final class ConfigurationFileReloader implements ReloadCallback {
+
@Override
public void reload(URL uri) throws Exception {
Configuration config = new FileConfigurationParser().parseMainConfig(uri.openStream());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index 0645dca..ff93ffe 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -29,12 +29,12 @@ import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.filter.Filter;
+import org.apache.activemq.artemis.core.filter.FilterUtils;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
-import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.GroupingInfo;
@@ -51,6 +51,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
@@ -67,12 +68,12 @@ public class PostOfficeJournalLoader implements JournalLoader {
protected final PostOffice postOffice;
protected final PagingManager pagingManager;
- private StorageManager storageManager;
+ private final StorageManager storageManager;
private final QueueFactory queueFactory;
protected final NodeManager nodeManager;
private final ManagementService managementService;
private final GroupingHandler groupingHandler;
- private Configuration configuration;
+ private final Configuration configuration;
private Map<Long, Queue> queues;
public PostOfficeJournalLoader(PostOffice postOffice,
@@ -113,50 +114,45 @@ public class PostOfficeJournalLoader implements JournalLoader {
public void initQueues(Map<Long, QueueBindingInfo> queueBindingInfosMap,
List<QueueBindingInfo> queueBindingInfos) throws Exception {
int duplicateID = 0;
- for (QueueBindingInfo queueBindingInfo : queueBindingInfos) {
+ for (final QueueBindingInfo queueBindingInfo : queueBindingInfos) {
queueBindingInfosMap.put(queueBindingInfo.getId(), queueBindingInfo);
- Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString());
+ final Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString());
- boolean isTopicIdentification = filter != null && filter.getFilterString() != null &&
- filter.getFilterString().toString().equals(ActiveMQServerImpl.GENERIC_IGNORED_FILTER);
+ final boolean isTopicIdentification = FilterUtils.isTopicIdentification(filter);
if (postOffice.getBinding(queueBindingInfo.getQueueName()) != null) {
if (isTopicIdentification) {
- long tx = storageManager.generateID();
+ final long tx = storageManager.generateID();
storageManager.deleteQueueBinding(tx, queueBindingInfo.getId());
storageManager.commitBindings(tx);
continue;
}
else {
-
- SimpleString newName = queueBindingInfo.getQueueName().concat("-" + (duplicateID++));
+ final SimpleString newName = queueBindingInfo.getQueueName().concat("-" + (duplicateID++));
ActiveMQServerLogger.LOGGER.queueDuplicatedRenaming(queueBindingInfo.getQueueName().toString(), newName.toString());
queueBindingInfo.replaceQueueName(newName);
}
}
-
- PageSubscription subscription = null;
-
- if (!isTopicIdentification) {
- subscription = pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvider().createSubscription(queueBindingInfo.getId(), filter, true);
+ final QueueConfig.Builder queueConfigBuilder;
+ if (queueBindingInfo.getAddress() == null) {
+ queueConfigBuilder = QueueConfig.builderWith(queueBindingInfo.getId(), queueBindingInfo.getQueueName());
}
-
- Queue queue = queueFactory.createQueue(queueBindingInfo.getId(), queueBindingInfo.getAddress(), queueBindingInfo.getQueueName(), filter, subscription, queueBindingInfo.getUser(), true, false, queueBindingInfo.isAutoCreated());
-
- if (queueBindingInfo.isAutoCreated()) {
+ else {
+ queueConfigBuilder = QueueConfig.builderWith(queueBindingInfo.getId(), queueBindingInfo.getQueueName(), queueBindingInfo.getAddress());
+ }
+ queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(queueBindingInfo.getUser()).durable(true).temporary(false).autoCreated(queueBindingInfo.isAutoCreated());
+ final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build());
+ if (queue.isAutoCreated()) {
queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl) postOffice).getServer().getJMSQueueDeleter(), queueBindingInfo.getQueueName()));
}
- Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue, nodeManager.getNodeId());
-
- queues.put(queueBindingInfo.getId(), queue);
-
+ final Binding binding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId());
+ queues.put(queue.getID(), queue);
postOffice.addBinding(binding);
-
- managementService.registerAddress(queueBindingInfo.getAddress());
- managementService.registerQueue(queue, queueBindingInfo.getAddress(), storageManager);
+ managementService.registerAddress(queue.getAddress());
+ managementService.registerQueue(queue, queue.getAddress(), storageManager);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
index 280bb13..d8f772d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
@@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -66,6 +67,20 @@ public class QueueFactoryImpl implements QueueFactory {
}
@Override
+ public Queue createQueueWith(final QueueConfig config) {
+ final AddressSettings addressSettings = addressSettingsRepository.getMatch(config.address().toString());
+ final Queue queue;
+ if (addressSettings.isLastValueQueue()) {
+ queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+ }
+ else {
+ queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+ }
+ return queue;
+ }
+
+ @Deprecated
+ @Override
public Queue createQueue(final long persistenceID,
final SimpleString address,
final SimpleString name,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 4d4abd7..73aa20b 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -38,7 +38,8 @@
minOccurs="0">
<xsd:annotation>
<xsd:documentation>
- If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the
+ If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available
+ on the
classpath. If false then only the core protocol will be available, unless in Embedded mode where users
can inject their own Protocol Managers.
</xsd:documentation>
@@ -154,7 +155,8 @@
</xsd:element>
<xsd:element name="password-codec" type="xsd:string"
- default="org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec" maxOccurs="1" minOccurs="0">
+ default="org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec" maxOccurs="1"
+ minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Class name and its parameters for the Decoder used to decode the masked password. Ignored if
@@ -199,9 +201,9 @@
<xsd:element name="jmx-use-broker-name" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
<xsd:annotation>
- <xsd:documentation>
- Whether or not to use the broker name in the JMX properties
- </xsd:documentation>
+ <xsd:documentation>
+ Whether or not to use the broker name in the JMX properties
+ </xsd:documentation>
</xsd:annotation>
</xsd:element>
@@ -246,7 +248,8 @@
</xsd:annotation>
</xsd:element>
- <xsd:element name="configuration-file-refresh-period" type="xsd:long" default="5000" maxOccurs="1" minOccurs="0">
+ <xsd:element name="configuration-file-refresh-period" type="xsd:long" default="5000" maxOccurs="1"
+ minOccurs="0">
<xsd:annotation>
<xsd:documentation>
how often (in ms) to check the configuration file for modifications
@@ -432,7 +435,7 @@
<xsd:element name="queue" maxOccurs="unbounded" minOccurs="0">
<xsd:complexType>
<xsd:all>
- <xsd:element name="address" type="xsd:string" maxOccurs="1" minOccurs="1">
+ <xsd:element name="address" type="xsd:string" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
address for the queue
@@ -679,7 +682,8 @@
<xsd:element name="global-max-size" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
- Global Max Size before all addresses will enter into their Full Policy configured upon messages being produced.
+ Global Max Size before all addresses will enter into their Full Policy configured upon messages being
+ produced.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
@@ -1233,11 +1237,11 @@
<xsd:complexType name="clusterConnectionChoiceType">
<xsd:sequence>
- <xsd:choice maxOccurs="unbounded">
- <xsd:element name="cluster-connection-uri" type="cluster-connectionUriType"/>
- <xsd:element name="cluster-connection" type="cluster-connectionType">
- </xsd:element>
- </xsd:choice>
+ <xsd:choice maxOccurs="unbounded">
+ <xsd:element name="cluster-connection-uri" type="cluster-connectionUriType"/>
+ <xsd:element name="cluster-connection" type="cluster-connectionType">
+ </xsd:element>
+ </xsd:choice>
</xsd:sequence>
</xsd:complexType>
@@ -1350,7 +1354,7 @@
</xsd:element>
<xsd:element name="use-duplicate-detection" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
- <xsd:annotation >
+ <xsd:annotation>
<xsd:documentation>
should duplicate detection headers be inserted in forwarded messages?
</xsd:documentation>
@@ -1360,7 +1364,8 @@
<xsd:element name="forward-when-no-consumers" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
- DEPRECATED: use message-load-balancing-type instead. Select STRICT to mimic forward-when-no-consumers=true
+ DEPRECATED: use message-load-balancing-type instead. Select STRICT to mimic
+ forward-when-no-consumers=true
and ON_DEMAND to mimic forward-when-no-consumers=false.
</xsd:documentation>
</xsd:annotation>
@@ -1858,7 +1863,8 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
- <xsd:element name="initial-replication-sync-timeout" type="xsd:long" default="30000" maxOccurs="1" minOccurs="0">
+ <xsd:element name="initial-replication-sync-timeout" type="xsd:long" default="30000" maxOccurs="1"
+ minOccurs="0">
<xsd:annotation>
<xsd:documentation>
The amount of time to wait for the replica to acknowledge it has received all the necessary data from
@@ -1923,11 +1929,13 @@
<xsd:element name="failback-delay" type="xsd:long" default="5000" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
- DEPRECATED: if we have to start as a replicated server this is the delay to wait before fail-back occurs
+ DEPRECATED: if we have to start as a replicated server this is the delay to wait before fail-back
+ occurs
</xsd:documentation>
</xsd:annotation>
</xsd:element>
- <xsd:element name="initial-replication-sync-timeout" type="xsd:long" default="30000" maxOccurs="1" minOccurs="0">
+ <xsd:element name="initial-replication-sync-timeout" type="xsd:long" default="30000" maxOccurs="1"
+ minOccurs="0">
<xsd:annotation>
<xsd:documentation>
If we have to start as a replicated server this is the amount of time to wait for the replica to
@@ -2099,31 +2107,33 @@
</xsd:annotation>
</xsd:element>
<xsd:choice>
- <xsd:element name="discovery-group-ref" maxOccurs="1" minOccurs="0">
- <xsd:complexType>
- <xsd:attribute name="discovery-group-name" type="xsd:IDREF" use="required">
- <xsd:annotation>
- <xsd:documentation>
- The discovery group to use for scale down, if not supplied then the scale-down-connectors or first
- invm connector will be used
- </xsd:documentation>
- </xsd:annotation>
- </xsd:attribute>
- </xsd:complexType>
- </xsd:element>
- <xsd:element name="connectors" minOccurs="0" maxOccurs="1">
- <xsd:annotation>
- <xsd:documentation>
- A list of connectors to use for scaling down, if not supplied then the scale-down-discovery-group or
- first invm connector will be used
- </xsd:documentation>
- </xsd:annotation>
- <xsd:complexType>
- <xsd:sequence>
- <xsd:element name="connector-ref" type="xsd:string" maxOccurs="unbounded" minOccurs="1"/>
- </xsd:sequence>
- </xsd:complexType>
- </xsd:element>
+ <xsd:element name="discovery-group-ref" maxOccurs="1" minOccurs="0">
+ <xsd:complexType>
+ <xsd:attribute name="discovery-group-name" type="xsd:IDREF" use="required">
+ <xsd:annotation>
+ <xsd:documentation>
+ The discovery group to use for scale down, if not supplied then the scale-down-connectors or
+ first
+ invm connector will be used
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ </xsd:complexType>
+ </xsd:element>
+ <xsd:element name="connectors" minOccurs="0" maxOccurs="1">
+ <xsd:annotation>
+ <xsd:documentation>
+ A list of connectors to use for scaling down, if not supplied then the scale-down-discovery-group
+ or
+ first invm connector will be used
+ </xsd:documentation>
+ </xsd:annotation>
+ <xsd:complexType>
+ <xsd:sequence>
+ <xsd:element name="connector-ref" type="xsd:string" maxOccurs="unbounded" minOccurs="1"/>
+ </xsd:sequence>
+ </xsd:complexType>
+ </xsd:element>
</xsd:choice>
</xsd:sequence>
</xsd:complexType>
@@ -2252,15 +2262,19 @@
<xsd:element name="max-size-bytes" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
- the maximum size (in bytes) for an address (-1 means no limits). This is used in PAGING, BLOCK and FAIL policies.
+ the maximum size (in bytes) for an address (-1 means no limits). This is used in PAGING, BLOCK and
+ FAIL policies.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
- <xsd:element name="max-size-bytes-reject-threshold" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
+ <xsd:element name="max-size-bytes-reject-threshold" type="xsd:long" default="-1" maxOccurs="1"
+ minOccurs="0">
<xsd:annotation>
<xsd:documentation>
- used with the address full BLOCK policy, the maximum size (in bytes) an address can reach before messages start getting rejected. Works in combination with max-size-bytes for AMQP protocol only. Default = -1 (no limit).
+ used with the address full BLOCK policy, the maximum size (in bytes) an address can reach before
+ messages start getting rejected. Works in combination with max-size-bytes for AMQP protocol only.
+ Default = -1 (no limit).
</xsd:documentation>
</xsd:annotation>
</xsd:element>
@@ -2491,11 +2505,11 @@
</xsd:complexType>
<xsd:complexType name="transportType">
- <xsd:simpleContent>
+ <xsd:simpleContent>
<xsd:extension base="xsd:string">
- <xsd:attribute name="name" type="xsd:string">
- </xsd:attribute>
+ <xsd:attribute name="name" type="xsd:string">
+ </xsd:attribute>
</xsd:extension>
- </xsd:simpleContent>
- </xsd:complexType>
+ </xsd:simpleContent>
+ </xsd:complexType>
</xsd:schema>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/QueueConfigTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/QueueConfigTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/QueueConfigTest.java
new file mode 100644
index 0000000..62e396d
--- /dev/null
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/QueueConfigTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class QueueConfigTest {
+
+ @Test
+ public void addressMustBeDefaultedToName() {
+ final QueueConfig queueConfig = QueueConfig.builderWith(1L, new SimpleString("queue_name")).build();
+ Assert.assertEquals(queueConfig.name(), queueConfig.address());
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void cannotAllowNullAddress() {
+ QueueConfig.builderWith(1L, new SimpleString("queue_name"), null);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void cannotAllowNullNameWithoutAddress() {
+ QueueConfig.builderWith(1L, null);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void cannotAllowNullNameWithAddress() {
+ QueueConfig.builderWith(1L, null, new SimpleString("queue_address"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index 89432d9..4ddabec 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRec
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
@@ -255,6 +256,13 @@ public class HangConsumerTest extends ActiveMQTestBase {
}
@Override
+ public Queue createQueueWith(final QueueConfig config) {
+ queue = new MyQueueWithBlocking(config.id(), config.address(), config.name(), config.filter(), config.user(), config.pageSubscription(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+ return queue;
+ }
+
+ @Deprecated
+ @Override
public Queue createQueue(final long persistenceID,
final SimpleString address,
final SimpleString name,
@@ -535,7 +543,11 @@ public class HangConsumerTest extends ActiveMQTestBase {
* @see SessionCallback#sendLargeMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, long, int)
*/
@Override
- public int sendLargeMessage(MessageReference reference, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
+ public int sendLargeMessage(MessageReference reference,
+ ServerMessage message,
+ ServerConsumer consumer,
+ long bodySize,
+ int deliveryCount) {
return targetCallback.sendLargeMessage(reference, message, consumer, bodySize, deliveryCount);
}
@@ -567,9 +579,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
class MyActiveMQServer extends ActiveMQServerImpl {
- MyActiveMQServer(Configuration configuration,
- MBeanServer mbeanServer,
- ActiveMQSecurityManager securityManager) {
+ MyActiveMQServer(Configuration configuration, MBeanServer mbeanServer, ActiveMQSecurityManager securityManager) {
super(configuration, mbeanServer, securityManager);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
index a4be3ad..f7b1c41 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java
@@ -33,18 +33,14 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
-import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
-import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
-import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
-import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -54,6 +50,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCon
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
@@ -61,7 +58,11 @@ import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.junit.Assert;
import org.junit.Before;
@@ -208,15 +209,12 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase {
server.stop();
}
-
-
@Test
public void testForcedInterruptUsingJMS() throws Exception {
ActiveMQServer server = createServer(true, isNetty());
server.start();
-
SimpleString jmsAddress = new SimpleString("jms.queue.Test");
server.createQueue(jmsAddress, jmsAddress, null, true, false);
@@ -265,7 +263,6 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase {
server.stop();
}
-
@Test
public void testSendNonPersistentQueue() throws Exception {
@@ -540,7 +537,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase {
}
}
- class NoPostACKQueueFactory implements QueueFactory {
+ final class NoPostACKQueueFactory implements QueueFactory {
final StorageManager storageManager;
@@ -565,6 +562,12 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase {
}
@Override
+ public Queue createQueueWith(final QueueConfig config) {
+ return new NoPostACKQueue(config.id(), config.address(), config.name(), config.filter(), config.user(), config.pageSubscription(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, execFactory.getExecutor());
+ }
+
+ @Deprecated
+ @Override
public Queue createQueue(long persistenceID,
SimpleString address,
SimpleString name,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java
----------------------------------------------------------------------
diff --git a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java
index adf97fc..34433ff 100644
--- a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java
+++ b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java
@@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.tests.unit.UnitTestLogger;
@@ -63,7 +64,8 @@ public class QueueConcurrentTest extends ActiveMQTestBase {
*/
@Test
public void testConcurrentAddsDeliver() throws Exception {
- QueueImpl queue = (QueueImpl) queueFactory.createQueue(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, null, false, false, false);
+
+ QueueImpl queue = (QueueImpl) queueFactory.createQueueWith(QueueConfig.builderWith(1, new SimpleString("address1"), new SimpleString("queue1")).durable(false).temporary(false).autoCreated(false).build());
FakeConsumer consumer = new FakeConsumer();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
index b507d3e..06c7e1e 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
@@ -25,11 +25,12 @@ import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
-public class FakeQueueFactory implements QueueFactory {
+public final class FakeQueueFactory implements QueueFactory {
private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(ActiveMQThreadFactory.defaultThreadFactory());
@@ -38,6 +39,12 @@ public class FakeQueueFactory implements QueueFactory {
private PostOffice postOffice;
@Override
+ public Queue createQueueWith(final QueueConfig config) {
+ return new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, null, null, executor);
+ }
+
+ @Deprecated
+ @Override
public Queue createQueue(final long persistenceID,
final SimpleString address,
final SimpleString name,