You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/11/28 20:17:17 UTC
[3/5] activemq-artemis git commit: Added ability to define 2
"delivery mode" types on a single address
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 184462b..5c43683 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -33,7 +33,9 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQEx
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
@@ -78,7 +80,9 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
+import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
@@ -87,7 +91,9 @@ import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_ADDRESS;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE_V2;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE_V3;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE_V2;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DELETE_QUEUE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ACKNOWLEDGE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY;
@@ -227,7 +233,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
case CREATE_ADDRESS: {
CreateAddressMessage request = (CreateAddressMessage) packet;
requiresResponse = request.isRequiresResponse();
- session.createAddress(request.getAddress(), request.isMulticast(), request.isAutoCreated());
+ session.createAddress(request.getAddress(), request.getRoutingTypes(), request.isAutoCreated());
if (requiresResponse) {
response = new NullResponseMessage();
}
@@ -236,7 +242,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
case CREATE_QUEUE: {
CreateQueueMessage request = (CreateQueueMessage) packet;
requiresResponse = request.isRequiresResponse();
- session.createQueue(request.getAddress(), request.getQueueName(), request.getFilterString(), request.isTemporary(), request.isDurable());
+ session.createQueue(request.getAddress(), request.getQueueName(), RoutingType.MULTICAST, request.getFilterString(), request.isTemporary(), request.isDurable());
if (requiresResponse) {
response = new NullResponseMessage();
}
@@ -245,7 +251,25 @@ public class ServerSessionPacketHandler implements ChannelHandler {
case CREATE_QUEUE_V2: {
CreateQueueMessage_V2 request = (CreateQueueMessage_V2) packet;
requiresResponse = request.isRequiresResponse();
- session.createQueue(request.getAddress(), request.getQueueName(), request.getFilterString(), request.isTemporary(), request.isDurable(), null, null, request.isAutoCreated());
+ session.createQueue(request.getAddress(),
+ request.getQueueName(),
+ RoutingType.MULTICAST,
+ request.getFilterString(),
+ request.isTemporary(),
+ request.isDurable(),
+ Queue.MAX_CONSUMERS_UNLIMITED,
+ false,
+ request.isAutoCreated());
+ if (requiresResponse) {
+ response = new NullResponseMessage();
+ }
+ break;
+ }
+ case CREATE_QUEUE_V3: {
+ CreateQueueMessage_V3 request = (CreateQueueMessage_V3) packet;
+ requiresResponse = request.isRequiresResponse();
+ session.createQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isTemporary(), request.isDurable(), request.getMaxConsumers(), request.isDeleteOnNoConsumers(),
+ request.isAutoCreated());
if (requiresResponse) {
response = new NullResponseMessage();
}
@@ -260,6 +284,15 @@ public class ServerSessionPacketHandler implements ChannelHandler {
}
break;
}
+ case CREATE_SHARED_QUEUE_V2: {
+ CreateSharedQueueMessage_V2 request = (CreateSharedQueueMessage_V2) packet;
+ requiresResponse = request.isRequiresResponse();
+ session.createSharedQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.isDurable(), request.getFilterString());
+ if (requiresResponse) {
+ response = new NullResponseMessage();
+ }
+ break;
+ }
case DELETE_QUEUE: {
requiresResponse = true;
SessionDeleteQueueMessage request = (SessionDeleteQueueMessage) packet;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
index 64e496a..39a6ac7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
@@ -233,7 +233,7 @@ public class ActiveMQPacketHandler implements ChannelHandler {
private void handleCreateQueue(final CreateQueueMessage request) {
try {
- server.createQueue(request.getAddress(), request.getQueueName(), request.getFilterString(), request.isDurable(), request.isTemporary());
+ server.createQueue(request.getAddress(), null, request.getQueueName(), request.getFilterString(), request.isDurable(), request.isTemporary());
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.failedToHandleCreateQueue(e);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index 5d39df0..1c20ba5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.server;
import java.io.File;
+import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
@@ -45,7 +46,6 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
import org.apache.activemq.artemis.core.security.CheckType;
-import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.jboss.logging.Messages;
import org.jboss.logging.annotations.Cause;
import org.jboss.logging.annotations.Message;
@@ -389,7 +389,7 @@ public interface ActiveMQMessageBundle {
ActiveMQQueueMaxConsumerLimitReached maxConsumerLimitReachedForQueue(SimpleString address, SimpleString queueName);
@Message(id = 119201, value = "Expected Routing Type {1} but found {2} for address {0}", format = Message.Format.MESSAGE_FORMAT)
- ActiveMQUnexpectedRoutingTypeForAddress unexpectedRoutingTypeForAddress(SimpleString address, AddressInfo.RoutingType expectedRoutingType, AddressInfo.RoutingType actualRoutingType);
+ ActiveMQUnexpectedRoutingTypeForAddress unexpectedRoutingTypeForAddress(SimpleString address, RoutingType expectedRoutingType, Set<RoutingType> supportedRoutingTypes);
@Message(id = 119202, value = "Invalid Queue Configuration for Queue {0}, Address {1}. Expected {2} to be {3} but was {4}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQInvalidQueueConfiguration invalidQueueConfiguration(SimpleString address, SimpleString queueName, String queuePropertyName, Object expectedValue, Object actualValue);
@@ -402,4 +402,12 @@ public interface ActiveMQMessageBundle {
@Message(id = 119205, value = "Address {0} has bindings", format = Message.Format.MESSAGE_FORMAT)
ActiveMQDeleteAddressException addressHasBindings(SimpleString address);
+
+ @Message(id = 119206, value = "Queue {0} has invalid max consumer setting: {1}", format = Message.Format.MESSAGE_FORMAT)
+ IllegalArgumentException invalidMaxConsumers(String queueName, int value);
+
+ @Message(id = 119207, value = "Can not create queue with delivery mode: {0}, Supported delivery modes for address: {1} are {2}", format = Message.Format.MESSAGE_FORMAT)
+ IllegalArgumentException invalidRoutingTypeForAddress(RoutingType routingType,
+ String address,
+ Set<RoutingType> supportedRoutingTypes);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index c0f1b97..31a6080 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -256,6 +256,15 @@ public interface ActiveMQServer extends ActiveMQComponent {
*/
boolean waitForActivation(long timeout, TimeUnit unit) throws InterruptedException;
+ Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
+ SimpleString user,
+ boolean durable,
+ boolean temporary,
+ boolean autoCreated,
+ Integer maxConsumers,
+ Boolean deleteOnNoConsumers,
+ boolean autoCreateAddress) throws Exception;
+
/**
* Creates a transient queue. A queue that will exist as long as there are consumers.
* The queue will be deleted as soon as all the consumers are removed.
@@ -269,72 +278,54 @@ public interface ActiveMQServer extends ActiveMQComponent {
* @throws org.apache.activemq.artemis.api.core.ActiveMQInvalidTransientQueueUseException if the shared queue already exists with a different {@code address} or {@code filterString}
* @throws NullPointerException if {@code address} is {@code null}
*/
- void createSharedQueue(final SimpleString address,
- final SimpleString name,
- final SimpleString filterString,
+ void createSharedQueue(final SimpleString address, final RoutingType routingType, final SimpleString name, final SimpleString filterString,
final SimpleString user,
boolean durable) throws Exception;
- Queue createQueue(SimpleString address,
- SimpleString queueName,
- SimpleString filter,
+ Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
boolean durable,
boolean temporary) throws Exception;
- Queue createQueue(SimpleString address,
- SimpleString queueName,
- SimpleString filterString,
+ @Deprecated
+ Queue createQueue(SimpleString address, SimpleString queueName, SimpleString filter, boolean durable, boolean temporary) throws Exception;
+
+ Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filterString,
boolean durable,
boolean temporary,
- Integer maxConsumers,
- Boolean deleteOnNoConsumers,
+ int maxConsumers,
+ boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception;
- Queue createQueue(SimpleString address,
- SimpleString queueName,
- SimpleString filter,
+ Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
SimpleString user,
boolean durable,
boolean temporary) throws Exception;
- Queue createQueue(SimpleString address,
- SimpleString queueName,
- SimpleString filter,
+ Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
SimpleString user,
boolean durable,
boolean temporary,
- Integer maxConsumers,
- Boolean deleteOnNoConsumers,
+ int maxConsumers,
+ boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception;
- Queue createQueue(SimpleString address,
- SimpleString queueName,
- SimpleString filter,
+ Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
SimpleString user,
boolean durable,
boolean temporary,
boolean autoCreated) throws Exception;
- Queue createQueue(SimpleString address,
- SimpleString queueName,
- SimpleString filter,
- SimpleString user,
- boolean durable,
- boolean temporary,
- boolean autoCreated,
- Integer maxConsumers,
- Boolean deleteOnNoConsumers,
- boolean autoCreateAddress) throws Exception;
+ @Deprecated
+ Queue deployQueue(String address, String queue, String filter, boolean durable, boolean temporary) throws Exception;
- Queue deployQueue(SimpleString address,
- SimpleString queueName,
- SimpleString filterString,
+ @Deprecated
+ Queue deployQueue(SimpleString address, SimpleString queue, SimpleString filter, boolean durable, boolean temporary) throws Exception;
+
+ Queue deployQueue(SimpleString address, RoutingType routingType, SimpleString resourceName, SimpleString filterString,
boolean durable,
boolean temporary) throws Exception;
- Queue deployQueue(SimpleString address,
- SimpleString queueName,
- SimpleString filterString,
+ Queue deployQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filterString,
boolean durable,
boolean temporary,
boolean autoCreated) throws Exception;
@@ -345,14 +336,12 @@ public interface ActiveMQServer extends ActiveMQComponent {
QueueQueryResult queueQuery(SimpleString name) throws Exception;
- Queue deployQueue(SimpleString address,
- SimpleString queueName,
- SimpleString filterString,
+ Queue deployQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filterString,
boolean durable,
boolean temporary,
boolean autoCreated,
- Integer maxConsumers,
- Boolean deleteOnNoConsumers,
+ int maxConsumers,
+ boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception;
void destroyQueue(SimpleString queueName) throws Exception;
@@ -408,6 +397,7 @@ public interface ActiveMQServer extends ActiveMQComponent {
Queue createQueue(SimpleString addressName,
SimpleString queueName,
+ RoutingType routingType,
SimpleString filterString,
SimpleString user,
boolean durable,
@@ -415,14 +405,14 @@ public interface ActiveMQServer extends ActiveMQComponent {
boolean ignoreIfExists,
boolean transientQueue,
boolean autoCreated,
- Integer maxConsumers,
- Boolean deleteOnNoConsumers,
+ int maxConsumers,
+ boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception;
/*
- * add a ProtocolManagerFactory to be used. Note if @see Configuration#isResolveProtocols is tur then this factory will
- * replace any factories with the same protocol
- * */
+ * add a ProtocolManagerFactory to be used. Note if @see Configuration#isResolveProtocols is tur then this factory will
+ * replace any factories with the same protocol
+ * */
void addProtocolManagerFactory(ProtocolManagerFactory factory);
/*
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 2b845d5..cf044f1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -32,6 +32,8 @@ import org.apache.activemq.artemis.utils.ReferenceCounter;
public interface Queue extends Bindable {
+ int MAX_CONSUMERS_UNLIMITED = -1;
+
SimpleString getName();
long getID();
@@ -40,6 +42,10 @@ public interface Queue extends Bindable {
PageSubscription getPageSubscription();
+ RoutingType getRoutingType();
+
+ void setRoutingType(RoutingType routingType);
+
boolean isDurable();
boolean isTemporary();
@@ -233,6 +239,7 @@ public interface Queue extends Bindable {
/**
* if the pause was persisted
+ *
* @return
*/
boolean isPersistedPause();
@@ -283,4 +290,5 @@ public interface Queue extends Bindable {
SimpleString getUser();
void decDelivering(int size);
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
index 81834be..3435ca0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.server;
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.FilterUtils;
@@ -33,7 +34,8 @@ public final class QueueConfig {
private final boolean durable;
private final boolean temporary;
private final boolean autoCreated;
- private final Integer maxConsumers;
+ private final RoutingType routingType;
+ private final int maxConsumers;
private final boolean deleteOnNoConsumers;
public static final class Builder {
@@ -47,7 +49,8 @@ public final class QueueConfig {
private boolean durable;
private boolean temporary;
private boolean autoCreated;
- private Integer maxConsumers;
+ private RoutingType routingType;
+ private int maxConsumers;
private boolean deleteOnNoConsumers;
private Builder(final long id, final SimpleString name) {
@@ -64,8 +67,9 @@ public final class QueueConfig {
this.durable = true;
this.temporary = false;
this.autoCreated = true;
- this.maxConsumers = -1;
- this.deleteOnNoConsumers = false;
+ this.routingType = ActiveMQDefaultConfiguration.getDefaultRoutingType();
+ this.maxConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers();
+ this.deleteOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers();
validateState();
}
@@ -112,7 +116,7 @@ public final class QueueConfig {
return this;
}
- public Builder maxConsumers(final Integer maxConsumers) {
+ public Builder maxConsumers(final int maxConsumers) {
this.maxConsumers = maxConsumers;
return this;
}
@@ -122,6 +126,11 @@ public final class QueueConfig {
return this;
}
+ public Builder deliveryMode(RoutingType routingType) {
+ this.routingType = routingType;
+ return this;
+ }
+
/**
* Returns a new {@link QueueConfig} using the parameters configured on the {@link Builder}.
* <br>
@@ -143,7 +152,7 @@ public final class QueueConfig {
} else {
pageSubscription = null;
}
- return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers);
+ return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, deleteOnNoConsumers);
}
}
@@ -185,7 +194,8 @@ public final class QueueConfig {
final boolean durable,
final boolean temporary,
final boolean autoCreated,
- final Integer maxConsumers,
+ final RoutingType routingType,
+ final int maxConsumers,
final boolean deleteOnNoConsumers) {
this.id = id;
this.address = address;
@@ -196,6 +206,7 @@ public final class QueueConfig {
this.durable = durable;
this.temporary = temporary;
this.autoCreated = autoCreated;
+ this.routingType = routingType;
this.deleteOnNoConsumers = deleteOnNoConsumers;
this.maxConsumers = maxConsumers;
}
@@ -240,10 +251,14 @@ public final class QueueConfig {
return deleteOnNoConsumers;
}
- public Integer maxConsumers() {
+ public int maxConsumers() {
return maxConsumers;
}
+ public RoutingType deliveryMode() {
+ return routingType;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o)
@@ -269,6 +284,8 @@ public final class QueueConfig {
return false;
if (pageSubscription != null ? !pageSubscription.equals(that.pageSubscription) : that.pageSubscription != null)
return false;
+ if (routingType != that.routingType)
+ return false;
if (maxConsumers != that.maxConsumers)
return false;
if (deleteOnNoConsumers != that.deleteOnNoConsumers)
@@ -288,6 +305,7 @@ public final class QueueConfig {
result = 31 * result + (durable ? 1 : 0);
result = 31 * result + (temporary ? 1 : 0);
result = 31 * result + (autoCreated ? 1 : 0);
+ result = 31 * result + routingType.getType();
result = 31 * result + maxConsumers;
result = 31 * result + (deleteOnNoConsumers ? 1 : 0);
return result;
@@ -305,6 +323,7 @@ public final class QueueConfig {
+ ", durable=" + durable
+ ", temporary=" + temporary
+ ", autoCreated=" + autoCreated
+ + ", routingType=" + routingType
+ ", maxConsumers=" + maxConsumers
+ ", deleteOnNoConsumers=" + deleteOnNoConsumers + '}';
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index 23426ca..badadf4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -104,11 +104,48 @@ public interface ServerSession extends SecurityAuth {
Queue createQueue(SimpleString address,
SimpleString name,
+ RoutingType routingType,
SimpleString filterString,
boolean temporary,
boolean durable) throws Exception;
- AddressInfo createAddress(final SimpleString address, final boolean multicast, final boolean autoCreated) throws Exception;
+ /** Create queue with default delivery mode
+ *
+ * @param address
+ * @param name
+ * @param filterString
+ * @param temporary
+ * @param durable
+ * @return
+ * @throws Exception
+ */
+ Queue createQueue(SimpleString address,
+ SimpleString name,
+ SimpleString filterString,
+ boolean temporary,
+ boolean durable) throws Exception;
+
+ Queue createQueue(SimpleString address,
+ SimpleString name,
+ RoutingType routingType,
+ SimpleString filterString,
+ boolean temporary,
+ boolean durable,
+ int maxConsumers,
+ boolean deleteOnNoConsumers,
+ boolean autoCreated) throws Exception;
+
+ Queue createQueue(SimpleString address,
+ SimpleString name,
+ RoutingType routingType,
+ SimpleString filterString,
+ boolean temporary,
+ boolean durable,
+ boolean autoCreated) throws Exception;
+
+ AddressInfo createAddress(final SimpleString address, Set<RoutingType> routingTypes, final boolean autoCreated) throws Exception;
+
+ AddressInfo createAddress(final SimpleString address, RoutingType routingType, final boolean autoCreated) throws Exception;
void deleteQueue(SimpleString name) throws Exception;
@@ -186,14 +223,11 @@ public interface ServerSession extends SecurityAuth {
boolean isClosed();
- Queue createQueue(SimpleString address,
- SimpleString name,
- SimpleString filterString,
- boolean temporary,
- boolean durable,
- Integer maxConsumers,
- Boolean deleteOnNoConsumers,
- final Boolean autoCreated) throws Exception;
+ void createSharedQueue(SimpleString address,
+ SimpleString name,
+ final RoutingType routingType,
+ boolean durable,
+ SimpleString filterString) throws Exception;
void createSharedQueue(SimpleString address,
SimpleString name,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index 5ee94f0..2ae2329 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
@@ -719,7 +720,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
} else {
// Add binding in storage so the queue will get reloaded on startup and we can find it - it's never
// actually routed to at that address though
- queue = server.createQueue(queueName, queueName, null, true, false);
+ queue = server.createQueue(queueName, RoutingType.MULTICAST, queueName, null, true, false);
}
// There are a few things that will behave differently when it's an internal queue
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index b2cd7f8..9128424 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -109,6 +109,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
+import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
@@ -1392,11 +1393,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override
public Queue createQueue(final SimpleString address,
+ final RoutingType routingType,
final SimpleString queueName,
final SimpleString filterString,
final boolean durable,
final boolean temporary) throws Exception {
- return createQueue(address, queueName, filterString, null, durable, temporary, false, false, false);
+ return createQueue(address, routingType, queueName, filterString, null, durable, temporary, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), true);
}
@Override
@@ -1404,51 +1406,61 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final SimpleString queueName,
final SimpleString filterString,
final boolean durable,
+ final boolean temporary) throws Exception {
+ return createQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName, filterString, durable, temporary);
+ }
+
+ @Override
+ public Queue createQueue(final SimpleString address,
+ final RoutingType routingType,
+ final SimpleString queueName,
+ final SimpleString filterString,
+ final boolean durable,
final boolean temporary,
- final Integer maxConsumers,
- final Boolean deleteOnNoConsumers,
+ final int maxConsumers,
+ final boolean deleteOnNoConsumers,
final boolean autoCreateAddress) throws Exception {
- return createQueue(address, queueName, filterString, null, durable, temporary, false, false, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
+ return createQueue(address, queueName, routingType, filterString, null, durable, temporary, false, false, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
}
@Override
public Queue createQueue(final SimpleString address,
+ final RoutingType routingType,
final SimpleString queueName,
final SimpleString filterString,
final SimpleString user,
final boolean durable,
final boolean temporary) throws Exception {
- return createQueue(address, queueName, filterString, user, durable, temporary, false, false, false);
+ return createQueue(address, routingType, queueName, filterString, user, durable, temporary, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), true);
}
@Override
- public Queue createQueue(SimpleString address,
- SimpleString queueName,
- SimpleString filter,
+ public Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
SimpleString user,
boolean durable,
boolean temporary,
- Integer maxConsumers,
- Boolean deleteOnNoConsumers,
+ int maxConsumers,
+ boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception {
- return createQueue(address, queueName, filter, user, durable, temporary, false, false, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
+ return createQueue(address, queueName, routingType, filter, user, durable, temporary, false, false, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
}
@Override
public Queue createQueue(final SimpleString address,
+ final RoutingType routingType,
final SimpleString queueName,
final SimpleString filterString,
final SimpleString user,
final boolean durable,
final boolean temporary,
final boolean autoCreated) throws Exception {
- return createQueue(address, queueName, filterString, user, durable, temporary, false, false, autoCreated);
+ return createQueue(address, routingType, queueName, filterString, user, durable, temporary,
+ ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
+ ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), autoCreated);
}
@Override
- public Queue createQueue(SimpleString address,
- SimpleString queueName,
- SimpleString filter,
+ public Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
SimpleString user,
boolean durable,
boolean temporary,
@@ -1456,20 +1468,27 @@ public class ActiveMQServerImpl implements ActiveMQServer {
Integer maxConsumers,
Boolean deleteOnNoConsumers,
boolean autoCreateAddress) throws Exception {
- return createQueue(address, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
+ return createQueue(address, queueName, routingType, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
}
@Override
- public void createSharedQueue(final SimpleString address,
- final SimpleString name,
- final SimpleString filterString,
+ public void createSharedQueue(final SimpleString address, RoutingType routingType, final SimpleString name, final SimpleString filterString,
final SimpleString user,
boolean durable) throws Exception {
//force the old contract about address
if (address == null) {
throw new NullPointerException("address can't be null!");
}
- final Queue queue = createQueue(address, name, filterString, user, durable, !durable, true, !durable, false);
+
+ if (routingType == null) {
+ AddressInfo addressInfo = getAddressInfo(address);
+ routingType = addressInfo.getRoutingTypes().size() == 1 ? addressInfo.getRoutingType() : ActiveMQDefaultConfiguration.getDefaultRoutingType();
+ if (routingType == null) {
+ // TODO (mtaylor) throw exception Can not determine routing type info from address
+ }
+ }
+
+ final Queue queue = createQueue(address, routingType, name, filterString, user, durable, !durable, false);
if (!queue.getAddress().equals(address)) {
throw ActiveMQMessageBundle.BUNDLE.queueSubscriptionBelongsToDifferentAddress(name);
@@ -1508,34 +1527,55 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final SimpleString filterString,
final boolean durable,
final boolean temporary) throws Exception {
- return deployQueue(address, resourceName, filterString, durable, temporary, false);
+ return deployQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), resourceName, filterString, durable, temporary, false);
+ }
+
+ @Override
+ public Queue deployQueue(final String address,
+ final String resourceName,
+ final String filterString,
+ final boolean durable,
+ final boolean temporary) throws Exception {
+ return deployQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(resourceName), SimpleString.toSimpleString(filterString), durable, temporary);
+ }
+
+ @Override
+ public Queue deployQueue(final SimpleString address,
+ final RoutingType routingType,
+ final SimpleString resourceName,
+ final SimpleString filterString,
+ final boolean durable,
+ final boolean temporary) throws Exception {
+ return deployQueue(address, routingType, resourceName, filterString, durable, temporary, false);
}
@Override
public Queue deployQueue(final SimpleString address,
+ final RoutingType routingType,
final SimpleString queueName,
final SimpleString filterString,
final boolean durable,
final boolean temporary,
final boolean autoCreated) throws Exception {
- return deployQueue(address, queueName, filterString, durable, temporary, autoCreated, null, null, true);
+ return deployQueue(address, routingType, queueName, filterString, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), true);
}
@Override
public Queue deployQueue(final SimpleString address,
+ final RoutingType routingType,
final SimpleString queueName,
final SimpleString filterString,
final boolean durable,
final boolean temporary,
final boolean autoCreated,
- final Integer maxConsumers,
- final Boolean deleteOnNoConsumers,
+ final int maxConsumers,
+ final boolean deleteOnNoConsumers,
final boolean autoCreateAddress) throws Exception {
// TODO: fix logging here as this could be for a topic or queue
ActiveMQServerLogger.LOGGER.deployQueue(queueName);
- return createQueue(address, queueName, filterString, null, durable, temporary, true, false, autoCreated, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
+ return createQueue(address, queueName, routingType, filterString, null, durable, temporary, true, false, autoCreated, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
}
@Override
@@ -2139,11 +2179,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private void deployAddressesFromConfiguration() throws Exception {
for (CoreAddressConfiguration config : configuration.getAddressConfigurations()) {
- AddressInfo info = new AddressInfo(SimpleString.toSimpleString(config.getName()));
- info.setRoutingType(config.getRoutingType());
- info.setDefaultDeleteOnNoConsumers(config.getDefaultDeleteOnNoConsumers());
- info.setDefaultMaxQueueConsumers(config.getDefaultMaxConsumers());
-
+ AddressInfo info = new AddressInfo(SimpleString.toSimpleString(config.getName()), config.getRoutingTypes());
createOrUpdateAddressInfo(info);
deployQueuesFromListCoreQueueConfiguration(config.getQueueConfigurations());
}
@@ -2151,7 +2187,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private void deployQueuesFromListCoreQueueConfiguration(List<CoreQueueConfiguration> queues) throws Exception {
for (CoreQueueConfiguration config : queues) {
- deployQueue(SimpleString.toSimpleString(config.getAddress()), SimpleString.toSimpleString(config.getName()), SimpleString.toSimpleString(config.getFilterString()), config.isDurable(), false, false, config.getMaxConsumers(), config.getDeleteOnNoConsumers(), true);
+ deployQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(), SimpleString.toSimpleString(config.getName()), SimpleString.toSimpleString(config.getFilterString()), config.isDurable(), false, false, config.getMaxConsumers(), config.getDeleteOnNoConsumers(), true);
}
}
@@ -2308,21 +2344,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return postOffice.getAddressInfo(address);
}
- private Queue createQueue(final SimpleString addressName,
- final SimpleString queueName,
- final SimpleString filterString,
- final SimpleString user,
- final boolean durable,
- final boolean temporary,
- final boolean ignoreIfExists,
- final boolean transientQueue,
- final boolean autoCreated) throws Exception {
- return createQueue(addressName, queueName, filterString, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, null, null, true);
- }
-
@Override
public Queue createQueue(final SimpleString addressName,
final SimpleString queueName,
+ final RoutingType routingType,
final SimpleString filterString,
final SimpleString user,
final boolean durable,
@@ -2330,8 +2355,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean ignoreIfExists,
final boolean transientQueue,
final boolean autoCreated,
- final Integer maxConsumers,
- final Boolean deleteOnNoConsumers,
+ final int maxConsumers,
+ final boolean deleteOnNoConsumers,
final boolean autoCreateAddress) throws Exception {
final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName);
@@ -2356,27 +2381,32 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
AddressInfo defaultAddressInfo = new AddressInfo(addressName);
+ defaultAddressInfo.addRoutingType(ActiveMQDefaultConfiguration.getDefaultRoutingType());
AddressInfo info = postOffice.getAddressInfo(addressName);
if (info == null) {
if (autoCreateAddress) {
- info = defaultAddressInfo;
+ postOffice.addAddressInfo(defaultAddressInfo);
+ info = postOffice.getAddressInfo(addressName);
} else {
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressName);
}
}
- final boolean isDeleteOnNoConsumers = deleteOnNoConsumers == null ? info.isDefaultDeleteOnNoConsumers() : deleteOnNoConsumers;
- final int noMaxConsumers = maxConsumers == null ? info.getDefaultMaxQueueConsumers() : maxConsumers;
+ final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).deliveryMode(routingType).maxConsumers(maxConsumers).deleteOnNoConsumers(deleteOnNoConsumers).build();
- final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).deleteOnNoConsumers(isDeleteOnNoConsumers).maxConsumers(noMaxConsumers).build();
final Queue queue = queueFactory.createQueueWith(queueConfig);
boolean addressAlreadyExists = true;
- if (postOffice.getAddressInfo(queue.getAddress()) == null) {
- postOffice.addAddressInfo(new AddressInfo(queue.getAddress()).setRoutingType(AddressInfo.RoutingType.MULTICAST).setDefaultMaxQueueConsumers(maxConsumers == null ? ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers() : maxConsumers));
+ AddressInfo addressInfo = postOffice.getAddressInfo(queue.getAddress());
+ if (addressInfo == null) {
+ postOffice.addAddressInfo(new AddressInfo(queue.getAddress()));
addressAlreadyExists = false;
+ } else {
+ if (!addressInfo.getRoutingTypes().contains(routingType)) {
+ throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeForAddress(routingType, addressInfo.getName().toString(), addressInfo.getRoutingTypes());
+ }
}
if (transientQueue) {
@@ -2385,7 +2415,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this, queue.getName()));
}
- final QueueBinding localQueueBinding = new LocalQueueBinding(getAddressInfo(queue.getAddress()), queue, nodeManager.getNodeId());
+ final QueueBinding localQueueBinding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId());
if (queue.isDurable()) {
storageManager.addQueueBinding(txID, localQueueBinding);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
index 9653a4e..64d6dd5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
@@ -16,8 +16,12 @@
*/
package org.apache.activemq.artemis.core.server.impl;
-import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.RoutingType;
public class AddressInfo {
@@ -25,52 +29,36 @@ public class AddressInfo {
private final SimpleString name;
- private RoutingType routingType = RoutingType.MULTICAST;
-
- private boolean defaultDeleteOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers();
-
- private int defaultMaxQueueConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers();
-
private boolean autoCreated = false;
private boolean deletable = false;
+ private Set<RoutingType> routingTypes;
+
public AddressInfo(SimpleString name) {
this.name = name;
+ routingTypes = new HashSet<>();
}
- public AddressInfo(SimpleString name, RoutingType routingType, boolean defaultDeleteOnNoConsumers, int defaultMaxConsumers) {
- this(name);
- this.routingType = routingType;
- this.defaultDeleteOnNoConsumers = defaultDeleteOnNoConsumers;
- this.defaultMaxQueueConsumers = defaultMaxConsumers;
- }
-
- public RoutingType getRoutingType() {
- return routingType;
- }
-
- public AddressInfo setRoutingType(RoutingType routingType) {
- this.routingType = routingType;
- return this;
- }
-
- public boolean isDefaultDeleteOnNoConsumers() {
- return defaultDeleteOnNoConsumers;
- }
-
- public AddressInfo setDefaultDeleteOnNoConsumers(boolean defaultDeleteOnNoConsumers) {
- this.defaultDeleteOnNoConsumers = defaultDeleteOnNoConsumers;
- return this;
- }
-
- public int getDefaultMaxQueueConsumers() {
- return defaultMaxQueueConsumers;
+ /**
+ * Creates an AddressInfo object with a Set of routing types
+ * @param name
+ * @param routingTypes
+ */
+ public AddressInfo(SimpleString name, Set<RoutingType> routingTypes) {
+ this.name = name;
+ this.routingTypes = routingTypes;
}
- public AddressInfo setDefaultMaxQueueConsumers(int defaultMaxQueueConsumers) {
- this.defaultMaxQueueConsumers = defaultMaxQueueConsumers;
- return this;
+ /**
+ * Creates an AddressInfo object with a single RoutingType associated with it.
+ * @param name
+ * @param routingType
+ */
+ public AddressInfo(SimpleString name, RoutingType routingType) {
+ this.name = name;
+ this.routingTypes = new HashSet<>();
+ routingTypes.add(routingType);
}
public boolean isAutoCreated() {
@@ -94,42 +82,47 @@ public class AddressInfo {
return id;
}
+ public Set<RoutingType> getRoutingTypes() {
+ return routingTypes;
+ }
+
+ public AddressInfo setRoutingTypes(Set<RoutingType> routingTypes) {
+ this.routingTypes = routingTypes;
+ return this;
+ }
+
+ public AddressInfo addRoutingType(RoutingType routingType) {
+ if (routingTypes == null) {
+ routingTypes = new HashSet<>();
+ }
+ routingTypes.add(routingType);
+ return this;
+ }
+
+ public RoutingType getRoutingType() {
+ /* We want to use a Set to guarantee only a single entry for ANYCAST, MULTICAST can be added to routing types.
+ There are cases where we also want to get any routing type (when a queue doesn't specifyc it's routing type for
+ example. For this reason we return the first element in the Set.
+ */
+ // TODO There must be a better way of doing this. This creates an iterator on each lookup.
+ for (RoutingType routingType : routingTypes) {
+ return routingType;
+ }
+ return null;
+ }
+
@Override
public String toString() {
StringBuffer buff = new StringBuffer();
buff.append("Address [name=" + name);
buff.append(", id=" + id);
- buff.append(", routingType=" + routingType);
- buff.append(", defaultMaxQueueConsumers=" + defaultMaxQueueConsumers);
- buff.append(", defaultDeleteOnNoConsumers=" + defaultDeleteOnNoConsumers);
+ buff.append(", routingTypes={");
+ for (RoutingType routingType : routingTypes) {
+ buff.append(routingType.toString() + ",");
+ }
buff.append(", autoCreated=" + autoCreated);
buff.append("]");
return buff.toString();
}
- public enum RoutingType {
- MULTICAST, ANYCAST;
-
- public byte getType() {
- switch (this) {
- case MULTICAST:
- return 0;
- case ANYCAST:
- return 1;
- default:
- return -1;
- }
- }
-
- public static RoutingType getType(byte type) {
- switch (type) {
- case 0:
- return MULTICAST;
- case 1:
- return ANYCAST;
- default:
- return null;
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index a4fa5dc..8896aa4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerMessage;
@@ -56,6 +57,7 @@ public class LastValueQueue extends QueueImpl {
final boolean durable,
final boolean temporary,
final boolean autoCreated,
+ final RoutingType routingType,
final Integer maxConsumers,
final Boolean deleteOnNoConsumers,
final ScheduledExecutorService scheduledExecutor,
@@ -63,7 +65,7 @@ public class LastValueQueue extends QueueImpl {
final StorageManager storageManager,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
final Executor executor) {
- super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
+ super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, deleteOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
new Exception("LastValueQeue " + this).toString();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index eb31737..20ef545 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -160,7 +160,7 @@ public class PostOfficeJournalLoader implements JournalLoader {
}
}
- final Binding binding = new LocalQueueBinding(postOffice.getAddressInfo(queue.getAddress()), queue, nodeManager.getNodeId());
+ final Binding binding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId());
queues.put(queue.getID(), queue);
postOffice.addBinding(binding);
@@ -178,9 +178,7 @@ public class PostOfficeJournalLoader implements JournalLoader {
// TODO: figure out what else to set here
AddressInfo addressInfo = new AddressInfo(addressBindingInfo.getName())
- .setRoutingType(addressBindingInfo.getRoutingType())
- .setDefaultMaxQueueConsumers(addressBindingInfo.getDefaultMaxConsumers());
-
+ .setRoutingTypes(addressBindingInfo.getRoutingTypes());
postOffice.addAddressInfo(addressInfo);
managementService.registerAddress(addressInfo);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
index bcc7c79..46beee7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server.impl;
import java.util.concurrent.ScheduledExecutorService;
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
@@ -75,9 +76,9 @@ public class QueueFactoryImpl implements QueueFactory {
final AddressSettings addressSettings = addressSettingsRepository.getMatch(config.address().toString());
final Queue queue;
if (addressSettings.isLastValueQueue()) {
- queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.maxConsumers(), config.isDeleteOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+ queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isDeleteOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
} else {
- queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.maxConsumers(), config.isDeleteOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+ queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isDeleteOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
}
return queue;
}
@@ -101,7 +102,7 @@ public class QueueFactoryImpl implements QueueFactory {
Queue queue;
if (addressSettings.isLastValueQueue()) {
- queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+ queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
} else {
queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index a2be58b..4a06ce1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -65,6 +65,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Consumer;
+import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
@@ -247,6 +248,8 @@ public class QueueImpl implements Queue {
private final AtomicInteger noConsumers = new AtomicInteger(0);
+ private RoutingType routingType;
+
/**
* This is to avoid multi-thread races on calculating direct delivery,
* to guarantee ordering will be always be correct
@@ -343,7 +346,7 @@ public class QueueImpl implements Queue {
final StorageManager storageManager,
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
final Executor executor) {
- this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
+ this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, RoutingType.MULTICAST, null, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
}
public QueueImpl(final long id,
@@ -355,6 +358,7 @@ public class QueueImpl implements Queue {
final boolean durable,
final boolean temporary,
final boolean autoCreated,
+ final RoutingType routingType,
final Integer maxConsumers,
final Boolean deleteOnNoConsumers,
final ScheduledExecutorService scheduledExecutor,
@@ -369,6 +373,8 @@ public class QueueImpl implements Queue {
this.addressInfo = postOffice == null ? null : postOffice.getAddressInfo(address);
+ this.routingType = routingType;
+
this.name = name;
this.filter = filter;
@@ -381,9 +387,9 @@ public class QueueImpl implements Queue {
this.autoCreated = autoCreated;
- this.maxConsumers = maxConsumers == null ? (addressInfo == null ? ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers() : addressInfo.getDefaultMaxQueueConsumers()) : maxConsumers;
+ this.maxConsumers = maxConsumers == null ? ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers() : maxConsumers;
- this.deleteOnNoConsumers = deleteOnNoConsumers == null ? (addressInfo == null ? ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers() : addressInfo.isDefaultDeleteOnNoConsumers()) : deleteOnNoConsumers;
+ this.deleteOnNoConsumers = deleteOnNoConsumers == null ? ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers() : deleteOnNoConsumers;
this.postOffice = postOffice;
@@ -502,6 +508,18 @@ public class QueueImpl implements Queue {
}
@Override
+ public RoutingType getRoutingType() {
+ return routingType;
+ }
+
+ @Override
+ public void setRoutingType(RoutingType routingType) {
+ if (addressInfo.getRoutingTypes().contains(routingType)) {
+ this.routingType = routingType;
+ }
+ }
+
+ @Override
public Filter getFilter() {
return filter;
}
@@ -755,7 +773,7 @@ public class QueueImpl implements Queue {
synchronized (this) {
- if (maxConsumers != -1 && noConsumers.get() >= maxConsumers) {
+ if (maxConsumers != MAX_CONSUMERS_UNLIMITED && noConsumers.get() >= maxConsumers) {
throw ActiveMQMessageBundle.BUNDLE.maxConsumerLimitReachedForQueue(address, name);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index e6de6cd..89d110e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -31,6 +31,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
@@ -65,6 +66,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
+import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
@@ -488,18 +490,44 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final SimpleString filterString,
final boolean temporary,
final boolean durable) throws Exception {
- return createQueue(address, name, filterString, temporary, durable, null, null, false);
+ return createQueue(address,
+ name,
+ ActiveMQDefaultConfiguration.getDefaultRoutingType(),
+ filterString,
+ temporary,
+ durable,
+ ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
+ ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(),
+ false);
}
@Override
public Queue createQueue(final SimpleString address,
final SimpleString name,
+ final RoutingType routingType,
+ final SimpleString filterString,
+ final boolean temporary,
+ final boolean durable) throws Exception {
+ return createQueue(address,
+ name, routingType,
+ filterString,
+ temporary,
+ durable,
+ ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
+ ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(),
+ false);
+ }
+
+ @Override
+ public Queue createQueue(final SimpleString address,
+ final SimpleString name,
+ final RoutingType routingType,
final SimpleString filterString,
final boolean temporary,
final boolean durable,
- final Integer maxConsumers,
- final Boolean deleteOnNoConsumers,
- final Boolean autoCreated) throws Exception {
+ final int maxConsumers,
+ final boolean deleteOnNoConsumers,
+ final boolean autoCreated) throws Exception {
if (durable) {
// make sure the user has privileges to create this queue
securityCheck(address, CheckType.CREATE_DURABLE_QUEUE, this);
@@ -509,7 +537,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
server.checkQueueCreationLimit(getUsername());
- Queue queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers, true);
+ Queue queue = server.createQueue(address, routingType, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers, true);
if (temporary) {
// Temporary queue in core simply means the queue will be deleted if
@@ -537,25 +565,54 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
@Override
- public AddressInfo createAddress(final SimpleString address, final boolean multicast, final boolean autoCreated) throws Exception {
- securityCheck(address, CheckType.CREATE_ADDRESS, this);
- AddressInfo.RoutingType routingType = multicast ? AddressInfo.RoutingType.MULTICAST : AddressInfo.RoutingType.ANYCAST;
+ public Queue createQueue(SimpleString address,
+ SimpleString name,
+ RoutingType routingType,
+ SimpleString filterString,
+ boolean temporary,
+ boolean durable,
+ boolean autoCreated) throws Exception {
+ return createQueue(address,
+ name, routingType,
+ filterString,
+ temporary,
+ durable,
+ ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
+ ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(),
+ autoCreated);
+ }
- AddressInfo addressInfo = server.createOrUpdateAddressInfo(new AddressInfo(address).setRoutingType(routingType).setAutoCreated(autoCreated));
+ @Override
+ public AddressInfo createAddress(final SimpleString address, Set<RoutingType> routingTypes, final boolean autoCreated) throws Exception {
+ securityCheck(address, CheckType.CREATE_ADDRESS, this);
+ return server.createOrUpdateAddressInfo(new AddressInfo(address, routingTypes).setAutoCreated(autoCreated));
+ }
- return addressInfo;
+ @Override
+ public AddressInfo createAddress(final SimpleString address, RoutingType routingType, final boolean autoCreated) throws Exception {
+ securityCheck(address, CheckType.CREATE_ADDRESS, this);
+ return server.createOrUpdateAddressInfo(new AddressInfo(address, routingType).setAutoCreated(autoCreated));
}
@Override
public void createSharedQueue(final SimpleString address,
final SimpleString name,
+ final RoutingType routingType,
boolean durable,
final SimpleString filterString) throws Exception {
securityCheck(address, CheckType.CREATE_NON_DURABLE_QUEUE, this);
server.checkQueueCreationLimit(getUsername());
- server.createSharedQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable);
+ server.createSharedQueue(address, routingType, name, filterString, SimpleString.toSimpleString(getUsername()), durable);
+ }
+
+ @Override
+ public void createSharedQueue(final SimpleString address,
+ final SimpleString name,
+ boolean durable,
+ final SimpleString filterString) throws Exception {
+ createSharedQueue(address, name, null, durable, filterString);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index bfb0054..7069c09 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -2569,12 +2569,6 @@
<!-- 2.0 Addressing configuration -->
- <xsd:simpleType name="routingType">
- <xsd:restriction base="xsd:string">
- <xsd:enumeration value="multicast" />
- <xsd:enumeration value="anycast" />
- </xsd:restriction>
- </xsd:simpleType>
<xsd:complexType name="queueType">
<xsd:all>
@@ -2588,7 +2582,19 @@
<xsd:complexType name="addressType">
<xsd:all>
- <xsd:element name="queues" maxOccurs="1" minOccurs="0">
+ <xsd:element name="anycast" maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ a list of pre configured queues to create
+ </xsd:documentation>
+ </xsd:annotation>
+ <xsd:complexType>
+ <xsd:sequence>
+ <xsd:element name="queue" type="queueType" maxOccurs="unbounded" minOccurs="0" />
+ </xsd:sequence>
+ </xsd:complexType>
+ </xsd:element>
+ <xsd:element name="multicast" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
a list of pre configured queues to create
@@ -2608,31 +2614,6 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
- <xsd:attribute name="type" type="routingType" use="required">
- <xsd:annotation>
- <xsd:documentation>
- The address name to matches incoming message addresses
- </xsd:documentation>
- </xsd:annotation>
- </xsd:attribute>
- <xsd:attribute name="default-max-consumers" type="xsd:int" use="optional" default="-1">
- <xsd:annotation>
- <xsd:documentation>
- The default value of max-consumers applied to all queues that are
- auto-created under this address. Also applies to any queues that do not
- specify a value for max-consumers.
- </xsd:documentation>
- </xsd:annotation>
- </xsd:attribute>
- <xsd:attribute name="default-delete-on-no-consumers" type="xsd:boolean" use="optional" default="false">
- <xsd:annotation>
- <xsd:documentation>
- The default value of delete-on-no-consumers applied to all queues that are
- auto-created under this address. Also applies to any queues that do not
- specify a value for delete-on-no-consumers.
- </xsd:documentation>
- </xsd:annotation>
- </xsd:attribute>
</xsd:complexType>
<xsd:complexType name="addressesType">
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index cbd2e65..fd65d26 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -23,10 +23,12 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -43,7 +45,9 @@ import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.security.Role;
+import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.LegacyLDAPSecuritySettingPlugin;
@@ -51,9 +55,6 @@ import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.junit.Assert;
import org.junit.Test;
-import static org.apache.activemq.artemis.core.server.impl.AddressInfo.RoutingType.ANYCAST;
-import static org.apache.activemq.artemis.core.server.impl.AddressInfo.RoutingType.MULTICAST;
-
public class FileConfigurationTest extends ConfigurationImplTest {
private final String fullConfigurationName = "ConfigurationTest-full-config.xml";
@@ -366,12 +367,14 @@ public class FileConfigurationTest extends ConfigurationImplTest {
}
private void verifyAddresses() {
- assertEquals(2, conf.getAddressConfigurations().size());
+ assertEquals(3, conf.getAddressConfigurations().size());
// Addr 1
CoreAddressConfiguration addressConfiguration = conf.getAddressConfigurations().get(0);
assertEquals("addr1", addressConfiguration.getName());
- assertEquals(ANYCAST, addressConfiguration.getRoutingType());
+ Set<RoutingType> routingTypes = new HashSet<>();
+ routingTypes.add(RoutingType.ANYCAST);
+ assertEquals(routingTypes, addressConfiguration.getRoutingTypes());
assertEquals(2, addressConfiguration.getQueueConfigurations().size());
// Addr 1 Queue 1
@@ -380,9 +383,9 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals("q1", queueConfiguration.getName());
assertFalse(queueConfiguration.isDurable());
assertEquals("color='blue'", queueConfiguration.getFilterString());
- assertEquals(addressConfiguration.getDefaultDeleteOnNoConsumers(), queueConfiguration.getDeleteOnNoConsumers());
+ assertEquals(ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), queueConfiguration.getDeleteOnNoConsumers());
assertEquals("addr1", queueConfiguration.getAddress());
- assertEquals(addressConfiguration.getDefaultMaxConsumers(), queueConfiguration.getMaxConsumers());
+ assertEquals(ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), queueConfiguration.getMaxConsumers());
// Addr 1 Queue 2
queueConfiguration = addressConfiguration.getQueueConfigurations().get(1);
@@ -390,14 +393,16 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals("q2", queueConfiguration.getName());
assertTrue(queueConfiguration.isDurable());
assertEquals("color='green'", queueConfiguration.getFilterString());
- assertEquals(new Integer(-1), queueConfiguration.getMaxConsumers());
+ assertEquals(Queue.MAX_CONSUMERS_UNLIMITED, queueConfiguration.getMaxConsumers());
assertFalse(queueConfiguration.getDeleteOnNoConsumers());
assertEquals("addr1", queueConfiguration.getAddress());
// Addr 2
addressConfiguration = conf.getAddressConfigurations().get(1);
assertEquals("addr2", addressConfiguration.getName());
- assertEquals(MULTICAST, addressConfiguration.getRoutingType());
+ routingTypes = new HashSet<>();
+ routingTypes.add(RoutingType.MULTICAST);
+ assertEquals(routingTypes, addressConfiguration.getRoutingTypes());
assertEquals(2, addressConfiguration.getQueueConfigurations().size());
// Addr 2 Queue 1
@@ -406,8 +411,8 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals("q3", queueConfiguration.getName());
assertTrue(queueConfiguration.isDurable());
assertEquals("color='red'", queueConfiguration.getFilterString());
- assertEquals(new Integer(10), queueConfiguration.getMaxConsumers());
- assertEquals(addressConfiguration.getDefaultDeleteOnNoConsumers(), queueConfiguration.getDeleteOnNoConsumers());
+ assertEquals(10, queueConfiguration.getMaxConsumers());
+ assertEquals(ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), queueConfiguration.getDeleteOnNoConsumers());
assertEquals("addr2", queueConfiguration.getAddress());
// Addr 2 Queue 2
@@ -416,9 +421,17 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals("q4", queueConfiguration.getName());
assertTrue(queueConfiguration.isDurable());
assertNull(queueConfiguration.getFilterString());
- assertEquals(addressConfiguration.getDefaultMaxConsumers(), queueConfiguration.getMaxConsumers());
+ assertEquals(ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), queueConfiguration.getMaxConsumers());
assertTrue(queueConfiguration.getDeleteOnNoConsumers());
assertEquals("addr2", queueConfiguration.getAddress());
+
+ // Addr 3
+ addressConfiguration = conf.getAddressConfigurations().get(2);
+ assertEquals("addr2", addressConfiguration.getName());
+ routingTypes = new HashSet<>();
+ routingTypes.add(RoutingType.MULTICAST);
+ routingTypes.add(RoutingType.ANYCAST);
+ assertEquals(routingTypes, addressConfiguration.getRoutingTypes());
}
@Test
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-server/src/test/java/org/apache/activemq/artemis/core/message/impl/MessagePropertyTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/message/impl/MessagePropertyTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/message/impl/MessagePropertyTest.java
index 8feed19..1638faa 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/message/impl/MessagePropertyTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/message/impl/MessagePropertyTest.java
@@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Before;
import org.junit.Test;
@@ -50,7 +51,10 @@ public class MessagePropertyTest extends ActiveMQTestBase {
private void sendMessages() throws Exception {
ClientSession session = sf.createSession(true, true);
- session.createQueue(ADDRESS, ADDRESS, null, true);
+
+ String filter = null;
+ session.createAddress(SimpleString.toSimpleString(ADDRESS), RoutingType.MULTICAST, false);
+ session.createQueue(ADDRESS, RoutingType.MULTICAST, ADDRESS, filter, true);
ClientProducer producer = session.createProducer(ADDRESS);
for (int i = 0; i < numMessages; i++) {