You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/11/28 20:17:19 UTC
[5/5] activemq-artemis git commit: Added ability to define 2
"delivery mode" types on a single address
Added ability to define 2 "delivery mode" types on a single address
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/dad04960
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/dad04960
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/dad04960
Branch: refs/heads/ARTEMIS-780
Commit: dad0496063fa91896bc065ecadcc81b5f31fabac
Parents: 2198869
Author: Martyn Taylor <mt...@redhat.com>
Authored: Fri Nov 25 13:06:21 2016 +0000
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Mon Nov 28 20:16:37 2016 +0000
----------------------------------------------------------------------
.../cli/commands/address/CreateAddress.java | 20 +-
.../config/ActiveMQDefaultConfiguration.java | 7 +
.../artemis/api/core/client/ClientSession.java | 201 ++++++++++++-
.../core/management/ActiveMQServerControl.java | 42 +--
.../api/core/management/AddressControl.java | 9 +-
.../core/client/impl/ClientSessionImpl.java | 290 +++++++++++++++++--
.../core/impl/ActiveMQSessionContext.java | 41 ++-
.../core/protocol/core/impl/PacketDecoder.java | 12 +
.../core/protocol/core/impl/PacketImpl.java | 4 +
.../impl/wireformat/CreateAddressMessage.java | 39 ++-
.../impl/wireformat/CreateQueueMessage_V2.java | 6 +-
.../impl/wireformat/CreateQueueMessage_V3.java | 134 +++++++++
.../wireformat/CreateSharedQueueMessage.java | 16 +-
.../wireformat/CreateSharedQueueMessage_V2.java | 134 +++++++++
.../artemis/core/server/RoutingType.java | 44 +++
.../spi/core/remoting/SessionContext.java | 35 ++-
.../jms/client/ActiveMQMessageProducer.java | 9 +-
.../artemis/jms/client/ActiveMQSession.java | 17 +-
.../jms/server/impl/JMSServerManagerImpl.java | 10 +-
.../artemis/junit/ActiveMQConsumerResource.java | 6 +
.../artemis/junit/EmbeddedActiveMQResource.java | 6 +-
.../protocol/mqtt/MQTTSubscriptionManager.java | 7 +-
.../protocol/openwire/OpenWireConnection.java | 2 +-
.../core/protocol/openwire/amq/AMQConsumer.java | 7 +-
.../core/protocol/openwire/amq/AMQSession.java | 2 +-
.../core/protocol/stomp/StompConnection.java | 22 +-
.../core/protocol/stomp/StompSession.java | 6 +-
.../stomp/VersionedStompFrameHandler.java | 16 +-
.../artemis/ra/ActiveMQRAMessageProducer.java | 4 +-
.../artemis/rest/test/FindDestinationTest.java | 5 +-
.../activemq/artemis/rest/test/RawAckTest.java | 8 +-
.../core/config/CoreAddressConfiguration.java | 88 +-----
.../core/config/CoreQueueConfiguration.java | 23 +-
.../artemis/core/config/impl/Validators.java | 10 +
.../deployers/impl/FileConfigurationParser.java | 49 ++--
.../impl/ActiveMQServerControlImpl.java | 34 +--
.../management/impl/AddressControlImpl.java | 5 +-
.../core/persistence/AddressBindingInfo.java | 9 +-
.../journal/AbstractJournalStorageManager.java | 4 +-
.../codec/PersistentAddressBindingEncoding.java | 68 ++---
.../core/postoffice/impl/LocalQueueBinding.java | 25 +-
.../postoffice/impl/SimpleAddressManager.java | 7 +-
.../core/ServerSessionPacketHandler.java | 39 ++-
.../core/impl/ActiveMQPacketHandler.java | 2 +-
.../core/server/ActiveMQMessageBundle.java | 12 +-
.../artemis/core/server/ActiveMQServer.java | 86 +++---
.../activemq/artemis/core/server/Queue.java | 8 +
.../artemis/core/server/QueueConfig.java | 35 ++-
.../artemis/core/server/ServerSession.java | 52 +++-
.../cluster/impl/ClusterConnectionImpl.java | 3 +-
.../core/server/impl/ActiveMQServerImpl.java | 134 +++++----
.../artemis/core/server/impl/AddressInfo.java | 123 ++++----
.../core/server/impl/LastValueQueue.java | 4 +-
.../server/impl/PostOfficeJournalLoader.java | 6 +-
.../core/server/impl/QueueFactoryImpl.java | 7 +-
.../artemis/core/server/impl/QueueImpl.java | 26 +-
.../core/server/impl/ServerSessionImpl.java | 79 ++++-
.../resources/schema/artemis-configuration.xsd | 45 +--
.../core/config/impl/FileConfigurationTest.java | 37 ++-
.../core/message/impl/MessagePropertyTest.java | 6 +-
.../impl/ScheduledDeliveryHandlerTest.java | 11 +
.../resources/ConfigurationTest-full-config.xml | 22 +-
.../addressing/AddressConfigTest.java | 9 +-
.../integration/addressing/AddressingTest.java | 53 +---
.../integration/cli/AddressCommandTest.java | 9 +-
.../integration/client/HangConsumerTest.java | 10 +-
.../tests/integration/client/ProducerTest.java | 3 +-
.../AnycastRoutingWithClusterTest.java | 10 +-
.../cluster/distribution/ClusterTestBase.java | 8 +-
.../failover/AsynchronousFailoverTest.java | 2 +-
.../cluster/failover/BackupSyncJournalTest.java | 3 +-
.../cluster/failover/FailoverTest.java | 49 ++--
.../ReplicatedMultipleServerFailoverTest.java | 3 +-
.../MultiThreadRandomReattachTestBase.java | 28 +-
.../cluster/reattach/OrderReattachTest.java | 3 +-
.../crossprotocol/OpenWireToAMQPTest.java | 3 +-
.../tests/integration/divert/DivertTest.java | 79 ++---
.../jms/client/TopicCleanupTest.java | 2 +-
.../integration/jms/consumer/ConsumerTest.java | 2 +-
.../ActiveMQServerControlUsingCoreTest.java | 36 ++-
.../integration/mqtt/imported/MQTTTest.java | 9 +-
.../integration/openwire/BasicOpenWireTest.java | 9 +-
.../openwire/amq/JmsTopicRedeliverTest.java | 2 +-
.../integration/security/LDAPSecurityTest.java | 5 +-
.../integration/server/ExpiryRunnerTest.java | 11 +-
.../integration/server/PredefinedQueueTest.java | 3 +-
.../tests/integration/stomp/StompTest.java | 51 ++--
.../tests/integration/stomp/StompTestBase.java | 10 +-
.../integration/stomp/v11/StompV11Test.java | 9 +-
.../integration/stomp/v12/StompV12Test.java | 9 +-
.../jms/tests/message/MessageHeaderTest.java | 239 ++++++++++++++-
.../activemq/artemis/common/AbstractAdmin.java | 4 +-
.../jms/conform/message/MessageDefaultTest.java | 2 +-
.../message/headers/MessageHeaderTest.java | 4 +-
.../unit/core/postoffice/impl/FakeQueue.java | 11 +
95 files changed, 2086 insertions(+), 824 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/CreateAddress.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/CreateAddress.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/CreateAddress.java
index 86aafaf..ac1a9a9 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/CreateAddress.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/CreateAddress.java
@@ -17,12 +17,16 @@
package org.apache.activemq.artemis.cli.commands.address;
+import java.util.HashSet;
+import java.util.Set;
+
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.cli.commands.AbstractAction;
import org.apache.activemq.artemis.cli.commands.ActionContext;
+import org.apache.activemq.artemis.core.server.RoutingType;
@Command(name = "create", description = "create an address")
public class CreateAddress extends AbstractAction {
@@ -30,8 +34,8 @@ public class CreateAddress extends AbstractAction {
@Option(name = "--name", description = "The name of this address")
String name;
- @Option(name = "--routingType", description = "The routing type of the address, options are 'anycast' or 'multicast', defaults to 1 = 'multicast'")
- String routingType = "multicast";
+ @Option(name = "--routingTypes", description = "The routing types supported by this address, options are 'anycast' or 'multicast', enter comma separated list, defaults to 'multicast' only")
+ Set<RoutingType> routingTypes = new HashSet<>();
@Option(name = "--defaultMaxConsumers", description = "Sets the default max consumers for any queues created under this address, default = -1 (no limit)")
int defaultMaxConsumers = -1;
@@ -50,7 +54,7 @@ public class CreateAddress extends AbstractAction {
performCoreManagement(new ManagementCallback<ClientMessage>() {
@Override
public void setUpInvocation(ClientMessage message) throws Exception {
- ManagementHelper.putOperationInvocation(message, "broker", "createAddress", getName(), routingType, defaultDeleteOnNoConsumers, defaultMaxConsumers);
+ ManagementHelper.putOperationInvocation(message, "broker", "createAddress", getName(), routingTypes, defaultDeleteOnNoConsumers, defaultMaxConsumers);
}
@Override
@@ -74,12 +78,14 @@ public class CreateAddress extends AbstractAction {
return name;
}
- public String getRoutingType() {
- return routingType;
+ public Set<RoutingType> getRoutingTypes() {
+ return routingTypes;
}
- public void setRoutingType(String routingType) {
- this.routingType = routingType;
+ public void setRoutingTypes(String routingTypes) {
+ for (String s : routingTypes.split(",")) {
+ this.routingTypes.add(RoutingType.valueOf(s.trim()));
+ }
}
public int getDefaultMaxConsumers() {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index e6f3795..e75c663 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.api.config;
import org.apache.activemq.artemis.ArtemisConstants;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.RoutingType;
/**
* Default values of ActiveMQ Artemis configuration parameters.
@@ -441,6 +442,8 @@ public final class ActiveMQDefaultConfiguration {
public static final boolean DEFAULT_DELETE_QUEUE_ON_NO_CONSUMERS = false;
+ public static final RoutingType DEFAULT_ROUTING_TYPE = RoutingType.MULTICAST;
+
public static final String DEFAULT_INTERNAL_NAMING_PREFIX = "$.artemis.internal.";
/**
@@ -1192,4 +1195,8 @@ public final class ActiveMQDefaultConfiguration {
public static String getInternalNamingPrefix() {
return DEFAULT_INTERNAL_NAMING_PREFIX;
}
+
+ public static RoutingType getDefaultRoutingType() {
+ return DEFAULT_ROUTING_TYPE;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
index 72b1a11..a414f95 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
@@ -18,9 +18,11 @@ package org.apache.activemq.artemis.api.core.client;
import javax.transaction.xa.XAResource;
import java.util.List;
+import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.RoutingType;
/**
* A ClientSession is a single-thread object required for producing and consuming messages.
@@ -198,7 +200,22 @@ public interface ClientSession extends XAResource, AutoCloseable {
*/
int getVersion();
- void createAddress(final SimpleString address, final boolean multicast, final boolean autoCreated) throws ActiveMQException;
+ /**
+ * Create Address with a single initial routing type
+ * @param address
+ * @param autoCreated
+ * @throws ActiveMQException
+ */
+ void createAddress(final SimpleString address, Set<RoutingType> routingTypes, final boolean autoCreated) throws ActiveMQException;
+
+ /**
+ * Create Address with a single initial routing type
+ * @param address
+ * @param routingType
+ * @param autoCreated
+ * @throws ActiveMQException
+ */
+ void createAddress(final SimpleString address, RoutingType routingType, final boolean autoCreated) throws ActiveMQException;
// Queue Operations ----------------------------------------------
@@ -210,6 +227,7 @@ public interface ClientSession extends XAResource, AutoCloseable {
* @param durable whether the queue is durable or not
* @throws ActiveMQException in an exception occurs while creating the queue
*/
+ @Deprecated
void createQueue(SimpleString address, SimpleString queueName, boolean durable) throws ActiveMQException;
/**
@@ -222,6 +240,7 @@ public interface ClientSession extends XAResource, AutoCloseable {
* @param durable if the queue is durable
* @throws ActiveMQException in an exception occurs while creating the queue
*/
+ @Deprecated
void createSharedQueue(SimpleString address, SimpleString queueName, boolean durable) throws ActiveMQException;
/**
@@ -235,6 +254,7 @@ public interface ClientSession extends XAResource, AutoCloseable {
* @param durable if the queue is durable
* @throws ActiveMQException in an exception occurs while creating the queue
*/
+ @Deprecated
void createSharedQueue(SimpleString address,
SimpleString queueName,
SimpleString filter,
@@ -248,6 +268,7 @@ public interface ClientSession extends XAResource, AutoCloseable {
* @param durable whether the queue is durable or not
* @throws ActiveMQException in an exception occurs while creating the queue
*/
+ @Deprecated
void createQueue(String address, String queueName, boolean durable) throws ActiveMQException;
/**
@@ -257,6 +278,7 @@ public interface ClientSession extends XAResource, AutoCloseable {
* @param queueName the name of the queue
* @throws ActiveMQException in an exception occurs while creating the queue
*/
+ @Deprecated
void createQueue(String address, String queueName) throws ActiveMQException;
/**
@@ -266,6 +288,7 @@ public interface ClientSession extends XAResource, AutoCloseable {
* @param queueName the name of the queue
* @throws ActiveMQException in an exception occurs while creating the queue
*/
+ @Deprecated
void createQueue(SimpleString address, SimpleString queueName) throws ActiveMQException;
/**
@@ -277,6 +300,7 @@ public interface ClientSession extends XAResource, AutoCloseable {
* @param durable whether the queue is durable or not
* @throws ActiveMQException in an exception occurs while creating the queue
*/
+ @Deprecated
void createQueue(SimpleString address,
SimpleString queueName,
SimpleString filter,
@@ -291,6 +315,7 @@ public interface ClientSession extends XAResource, AutoCloseable {
* @param filter only messages which match this filter will be put in the queue
* @throws ActiveMQException in an exception occurs while creating the queue
*/
+ @Deprecated
void createQueue(String address, String queueName, String filter, boolean durable) throws ActiveMQException;
/**
@@ -303,6 +328,7 @@ public interface ClientSession extends XAResource, AutoCloseable {
* @param autoCreated whether to mark this queue as autoCreated or not
* @throws ActiveMQException in an exception occurs while creating the queue
*/
+ @Deprecated
void createQueue(SimpleString address,
SimpleString queueName,
SimpleString filter,
@@ -319,6 +345,7 @@ public interface ClientSession extends XAResource, AutoCloseable {
* @param autoCreated whether to mark this queue as autoCreated or not
* @throws ActiveMQException in an exception occurs while creating the queue
*/
+ @Deprecated
void createQueue(String address, String queueName, String filter, boolean durable, boolean autoCreated) throws ActiveMQException;
/**
@@ -328,6 +355,7 @@ public interface ClientSession extends XAResource, AutoCloseable {
* @param queueName the name of the queue
* @throws ActiveMQException in an exception occurs while creating the queue
*/
+ @Deprecated
void createTemporaryQueue(SimpleString address, SimpleString queueName) throws ActiveMQException;
/**
@@ -337,6 +365,7 @@ public interface ClientSession extends XAResource, AutoCloseable {
* @param queueName the name of the queue
* @throws ActiveMQException in an exception occurs while creating the queue
*/
+ @Deprecated
void createTemporaryQueue(String address, String queueName) throws ActiveMQException;
/**
@@ -347,6 +376,7 @@ public interface ClientSession extends XAResource, AutoCloseable {
* @param filter only messages which match this filter will be put in the queue
* @throws ActiveMQException in an exception occurs while creating the queue
*/
+ @Deprecated
void createTemporaryQueue(SimpleString address,
SimpleString queueName,
SimpleString filter) throws ActiveMQException;
@@ -359,8 +389,177 @@ public interface ClientSession extends XAResource, AutoCloseable {
* @param filter only messages which match this filter will be put in the queue
* @throws ActiveMQException in an exception occurs while creating the queue
*/
+ @Deprecated
void createTemporaryQueue(String address, String queueName, String filter) throws ActiveMQException;
+ /** Deprecate **/
+
+
+ /**
+ * Creates a <em>non-temporary</em> queue.
+ *
+ * @param address the queue will be bound to this address
+ * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST
+ * @param queueName the name of the queue
+ * @param durable whether the queue is durable or not
+ * @throws ActiveMQException in an exception occurs while creating the queue
+ */
+ void createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, boolean durable) throws ActiveMQException;
+
+ /**
+ * Creates a transient queue. A queue that will exist as long as there are consumers. When the last consumer is closed the queue will be deleted
+ * <p>
+ * Notice: you will get an exception if the address or the filter doesn't match to an already existent queue
+ *
+ * @param address the queue will be bound to this address
+ * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST
+ * @param queueName the name of the queue
+ * @param durable if the queue is durable
+ * @throws ActiveMQException in an exception occurs while creating the queue
+ */
+ void createSharedQueue(SimpleString address, RoutingType routingType, SimpleString queueName, boolean durable) throws ActiveMQException;
+
+ /**
+ * Creates a transient queue. A queue that will exist as long as there are consumers. When the last consumer is closed the queue will be deleted
+ * <p>
+ * Notice: you will get an exception if the address or the filter doesn't match to an already existent queue
+ *
+ * @param address the queue will be bound to this address
+ * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST
+ * @param queueName the name of the queue
+ * @param filter whether the queue is durable or not
+ * @param durable if the queue is durable
+ * @throws ActiveMQException in an exception occurs while creating the queue
+ */
+ void createSharedQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
+ boolean durable) throws ActiveMQException;
+
+ /**
+ * Creates a <em>non-temporary</em> queue.
+ *
+ * @param address the queue will be bound to this address
+ * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST
+ * @param queueName the name of the queue
+ * @param durable whether the queue is durable or not
+ * @throws ActiveMQException in an exception occurs while creating the queue
+ */
+ void createQueue(String address, RoutingType routingType, String queueName, boolean durable) throws ActiveMQException;
+
+ /**
+ * Creates a <em>non-temporary</em> queue <em>non-durable</em> queue.
+ *
+ * @param address the queue will be bound to this address
+ * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST
+ * @param queueName the name of the queue
+ * @throws ActiveMQException in an exception occurs while creating the queue
+ */
+ void createQueue(String address, RoutingType routingType, String queueName) throws ActiveMQException;
+
+ /**
+ * Creates a <em>non-temporary</em> queue <em>non-durable</em> queue.
+ *
+ * @param address the queue will be bound to this address
+ * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST
+ * @param queueName the name of the queue
+ * @throws ActiveMQException in an exception occurs while creating the queue
+ */
+ void createQueue(SimpleString address, RoutingType routingType, SimpleString queueName) throws ActiveMQException;
+
+ /**
+ * Creates a <em>non-temporary</em> queue.
+ *
+ * @param address the queue will be bound to this address
+ * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST
+ * @param queueName the name of the queue
+ * @param filter only messages which match this filter will be put in the queue
+ * @param durable whether the queue is durable or not
+ * @throws ActiveMQException in an exception occurs while creating the queue
+ */
+ void createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
+ boolean durable) throws ActiveMQException;
+
+ /**
+ * Creates a <em>non-temporary</em>queue.
+ *
+ * @param address the queue will be bound to this address
+ * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST
+ * @param queueName the name of the queue
+ * @param filter only messages which match this filter will be put in the queue
+ * @param durable whether the queue is durable or not
+ * @throws ActiveMQException in an exception occurs while creating the queue
+ */
+ void createQueue(String address, RoutingType routingType, String queueName, String filter, boolean durable) throws ActiveMQException;
+
+ /**
+ * Creates a <em>non-temporary</em> queue.
+ *
+ * @param address the queue will be bound to this address
+ * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST
+ * @param queueName the name of the queue
+ * @param filter only messages which match this filter will be put in the queue
+ * @param durable whether the queue is durable or not
+ * @param autoCreated whether to mark this queue as autoCreated or not
+ * @throws ActiveMQException in an exception occurs while creating the queue
+ */
+ void createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
+ boolean durable,
+ boolean autoCreated) throws ActiveMQException;
+
+ /**
+ * Creates a <em>non-temporary</em>queue.
+ *
+ * @param address the queue will be bound to this address
+ * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST
+ * @param queueName the name of the queue
+ * @param filter only messages which match this filter will be put in the queue
+ * @param durable whether the queue is durable or not
+ * @param autoCreated whether to mark this queue as autoCreated or not
+ * @throws ActiveMQException in an exception occurs while creating the queue
+ */
+ void createQueue(String address, RoutingType routingType, String queueName, String filter, boolean durable, boolean autoCreated) throws ActiveMQException;
+
+ /**
+ * Creates a <em>temporary</em> queue.
+ *
+ * @param address the queue will be bound to this address
+ * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST
+ * @param queueName the name of the queue
+ * @throws ActiveMQException in an exception occurs while creating the queue
+ */
+ void createTemporaryQueue(SimpleString address, RoutingType routingType, SimpleString queueName) throws ActiveMQException;
+
+ /**
+ * Creates a <em>temporary</em> queue.
+ *
+ * @param address the queue will be bound to this address
+ * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST
+ * @param queueName the name of the queue
+ * @throws ActiveMQException in an exception occurs while creating the queue
+ */
+ void createTemporaryQueue(String address, RoutingType routingType, String queueName) throws ActiveMQException;
+
+ /**
+ * Creates a <em>temporary</em> queue with a filter.
+ *
+ * @param address the queue will be bound to this address
+ * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST
+ * @param queueName the name of the queue
+ * @param filter only messages which match this filter will be put in the queue
+ * @throws ActiveMQException in an exception occurs while creating the queue
+ */
+ void createTemporaryQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter) throws ActiveMQException;
+
+ /**
+ * Creates a <em>temporary</em> queue with a filter.
+ *
+ * @param address the queue will be bound to this address
+ * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST
+ * @param queueName the name of the queue
+ * @param filter only messages which match this filter will be put in the queue
+ * @throws ActiveMQException in an exception occurs while creating the queue
+ */
+ void createTemporaryQueue(String address, RoutingType routingType, String queueName, String filter) throws ActiveMQException;
+
/**
* Deletes the queue.
*
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
index 33584bf..43e7a4d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
@@ -18,8 +18,10 @@ package org.apache.activemq.artemis.api.core.management;
import javax.management.MBeanOperationInfo;
import java.util.Map;
+import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
+import org.apache.activemq.artemis.core.server.RoutingType;
/**
* An ActiveMQServerControl is used to manage ActiveMQ Artemis servers.
@@ -434,18 +436,9 @@ public interface ActiveMQServerControl {
// Operations ----------------------------------------------------
- @Operation(desc = "create an address", impact = MBeanOperationInfo.ACTION)
- void createAddress(@Parameter(name = "name", desc = "The name of the address") String name,
- @Parameter(name = "routingType", desc = "the routing type of the address either 0 for multicast or 1 for anycast") int routingType,
- @Parameter(name = "defaultDeleteOnNoConsumers", desc = "Whether or not a queue with this address is deleted when it has no consumers") boolean defaultDeleteOnNoConsumers,
- @Parameter(name = "defaultMaxConsumers", desc = "The maximim number of consumer a queue with this address can have") int defaultMaxConsumers) throws Exception;
-
-
- @Operation(desc = "create an address", impact = MBeanOperationInfo.ACTION)
+ @Operation(desc = "delete an address", impact = MBeanOperationInfo.ACTION)
void createAddress(@Parameter(name = "name", desc = "The name of the address") String name,
- @Parameter(name = "routingType", desc = "The routing type for the address either 'MULTICAST' or 'ANYCAST'") String routingType,
- @Parameter(name = "defaultDeleteOnNoConsumers", desc = "Whether or not a queue with this address is deleted when it has no consumers") boolean defaultDeleteOnNoConsumers,
- @Parameter(name = "defaultMaxConsumers", desc = "The maximim number of consumer a queue with this address can have") int defaultMaxConsumers) throws Exception;
+ @Parameter(name = "deliveryMode", desc = "The delivery modes enabled for this address'") Set<RoutingType> routingTypes) throws Exception;
@Operation(desc = "delete an address", impact = MBeanOperationInfo.ACTION)
void deleteAddress(@Parameter(name = "name", desc = "The name of the address") String name) throws Exception;
@@ -464,6 +457,14 @@ public interface ActiveMQServerControl {
void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
@Parameter(name = "name", desc = "Name of the queue") String name) throws Exception;
+ void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
+ @Parameter(name = "routingType", desc = "The routing type used for this address, 0=multicast, 1=anycast") RoutingType routingType,
+ @Parameter(name = "name", desc = "Name of the queue") String name,
+ @Parameter(name = "filter", desc = "Filter of the queue") String filterStr,
+ @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable,
+ @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers,
+ @Parameter(name = "deleteOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean deleteOnNoConsumers,
+ @Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception;
/**
* Create a queue.
@@ -500,25 +501,6 @@ public interface ActiveMQServerControl {
@Parameter(name = "durable", desc = "Is the queue durable?") boolean durable) throws Exception;
/**
- * Create a queue.
- * <br>
- * If {@code address} is {@code null} it will be defaulted to {@code name}.
- * <br>
- * This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
- *
- * @param address address to bind the queue to
- * @param name name of the queue
- * @param durable whether the queue is durable
- */
- @Operation(desc = "Create a queue with the specified address, name and durability", impact = MBeanOperationInfo.ACTION)
- void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
- @Parameter(name = "name", desc = "Name of the queue") String name,
- @Parameter(name = "filter", desc = "Filter of the queue") String filter,
- @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable,
- @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers,
- @Parameter(name = "deleteOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean deleteOnNoConsumers,
- @Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception;
- /**
* Deploy a durable queue.
* <br>
* If {@code address} is {@code null} it will be defaulted to {@code name}.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java
index 5e7d600..c48ef88 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java
@@ -18,6 +18,9 @@ package org.apache.activemq.artemis.api.core.management;
import javax.management.MBeanOperationInfo;
import java.util.Map;
+import java.util.Set;
+
+import org.apache.activemq.artemis.core.server.RoutingType;
/**
* An AddressControl is used to manage an address.
@@ -31,10 +34,10 @@ public interface AddressControl {
String getAddress();
/*
- * The routing type of this address, either multicast (topic subscriptions) or anycast (queue semantics).
+ * Whether multicast routing is enabled for this address
* */
- @Attribute(desc = "The routing type of this address")
- String getRoutingType();
+ @Attribute(desc = "Get the delivery modes enabled on this address")
+ Set<RoutingType> getDeliveryModes();
/**
* Returns the roles (name and permissions) associated with this address.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index 145ca99..1ed825b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -27,6 +27,7 @@ import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -43,6 +44,7 @@ import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import org.apache.activemq.artemis.core.remoting.FailureListener;
+import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
@@ -237,14 +239,14 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
@Override
public void createQueue(final SimpleString address, final SimpleString queueName) throws ActiveMQException {
- internalCreateQueue(address, queueName, null, false, false, false);
+ createQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName);
}
@Override
public void createQueue(final SimpleString address,
final SimpleString queueName,
final boolean durable) throws ActiveMQException {
- internalCreateQueue(address, queueName, null, durable, false, false);
+ createQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName, durable);
}
@Override
@@ -258,7 +260,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
public void createSharedQueue(SimpleString address,
SimpleString queueName,
boolean durable) throws ActiveMQException {
- createSharedQueue(address, queueName, null, durable);
+ createSharedQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName, null, durable);
}
@Override
@@ -266,28 +268,26 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
SimpleString queueName,
SimpleString filterString,
boolean durable) throws ActiveMQException {
+ createSharedQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName, filterString, durable);
+ }
+ @Override
+ public void createAddress(final SimpleString address, Set<RoutingType> routingTypes, boolean autoCreated) throws ActiveMQException {
checkClosed();
startCall();
try {
- sessionContext.createSharedQueue(address, queueName, filterString, durable);
+ sessionContext.createAddress(address, routingTypes, autoCreated);
} finally {
endCall();
}
-
}
@Override
- public void createAddress(final SimpleString address, final boolean multicast, boolean autoCreated) throws ActiveMQException {
- checkClosed();
-
- startCall();
- try {
- sessionContext.createAddress(address, multicast, autoCreated);
- } finally {
- endCall();
- }
+ public void createAddress(final SimpleString address, RoutingType routingType, boolean autoCreated) throws ActiveMQException {
+ Set<RoutingType> routingTypes = new HashSet<>();
+ routingTypes.add(routingType);
+ createAddress(address, routingTypes, autoCreated);
}
@Override
@@ -295,7 +295,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
final SimpleString queueName,
final SimpleString filterString,
final boolean durable) throws ActiveMQException {
- internalCreateQueue(address, queueName, filterString, durable, false, false);
+ createQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName, filterString,
+ durable);
}
@Override
@@ -303,7 +304,10 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
final String queueName,
final String filterString,
final boolean durable) throws ActiveMQException {
- createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filterString), durable);
+ createQueue(SimpleString.toSimpleString(address),
+ SimpleString.toSimpleString(queueName),
+ SimpleString.toSimpleString(filterString),
+ durable);
}
@Override
@@ -312,7 +316,9 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
final SimpleString filterString,
final boolean durable,
final boolean autoCreated) throws ActiveMQException {
- internalCreateQueue(address, queueName, filterString, durable, false, autoCreated);
+ createQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName, filterString,
+ durable,
+ autoCreated);
}
@Override
@@ -326,29 +332,258 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
@Override
public void createTemporaryQueue(final SimpleString address, final SimpleString queueName) throws ActiveMQException {
- internalCreateQueue(address, queueName, null, false, true, false);
+ createTemporaryQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName);
}
@Override
public void createTemporaryQueue(final String address, final String queueName) throws ActiveMQException {
- internalCreateQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), null, false, true, false);
+ createTemporaryQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName));
}
@Override
public void createTemporaryQueue(final SimpleString address,
final SimpleString queueName,
final SimpleString filter) throws ActiveMQException {
- internalCreateQueue(address, queueName, filter, false, true, false);
+ createTemporaryQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName, filter);
}
@Override
public void createTemporaryQueue(final String address,
final String queueName,
final String filter) throws ActiveMQException {
- internalCreateQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), false, true, false);
+ createTemporaryQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName, filter);
+ }
+
+
+ /** New Queue API **/
+
+
+ @Override
+ public void createQueue(final SimpleString address,
+ final RoutingType routingType,
+ final SimpleString queueName,
+ final SimpleString filterString,
+ final boolean durable,
+ final boolean autoCreated) throws ActiveMQException {
+ internalCreateQueue(address,
+ queueName, routingType,
+ filterString,
+ durable,
+ false,
+ ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
+ ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(),
+ autoCreated);
+ }
+
+ @Override
+ public void createQueue(final String address, final RoutingType routingType, final String queueName, final String filterString,
+ final boolean durable,
+ final boolean autoCreated) throws ActiveMQException {
+ createQueue(SimpleString.toSimpleString(address),
+ SimpleString.toSimpleString(queueName),
+ SimpleString.toSimpleString(filterString),
+ durable,
+ autoCreated);
}
@Override
+ public void createTemporaryQueue(final SimpleString address,
+ final RoutingType routingType,
+ final SimpleString queueName) throws ActiveMQException {
+ internalCreateQueue(address,
+ queueName, routingType,
+ null,
+ false,
+ true,
+ ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
+ ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(),
+ false);
+ }
+
+ @Override
+ public void createTemporaryQueue(final String address, final RoutingType routingType, final String queueName) throws ActiveMQException {
+ createTemporaryQueue(SimpleString.toSimpleString(address), routingType, SimpleString.toSimpleString(queueName));
+ }
+
+ @Override
+ public void createTemporaryQueue(final SimpleString address,
+ final RoutingType routingType,
+ final SimpleString queueName,
+ final SimpleString filter) throws ActiveMQException {
+ internalCreateQueue(address,
+ queueName, routingType,
+ filter,
+ false,
+ true,
+ ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
+ ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(),
+ false);
+ }
+
+ @Override
+ public void createTemporaryQueue(final String address, final RoutingType routingType, final String queueName, final String filter) throws ActiveMQException {
+ createTemporaryQueue(SimpleString.toSimpleString(address), routingType, SimpleString.toSimpleString(queueName));
+ }
+
+ /**
+ * Creates a <em>non-temporary</em> queue.
+ *
+ * @param address the queue will be bound to this address
+ * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST
+ * @param queueName the name of the queue
+ * @param durable whether the queue is durable or not
+ * @throws ActiveMQException in an exception occurs while creating the queue
+ */
+ @Override
+ public void createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, boolean durable) throws ActiveMQException {
+ internalCreateQueue(address,
+ queueName, routingType,
+ null,
+ durable,
+ false,
+ ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
+ ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(),
+ false);
+ }
+
+ /**
+ * Creates a transient queue. A queue that will exist as long as there are consumers. When the last consumer is closed the queue will be deleted
+ * <p>
+ * Notice: you will get an exception if the address or the filter doesn't match to an already existent queue
+ *
+ * @param address the queue will be bound to this address
+ * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST
+ * @param queueName the name of the queue
+ * @param durable if the queue is durable
+ * @throws ActiveMQException in an exception occurs while creating the queue
+ */
+ @Override
+ public void createSharedQueue(SimpleString address, RoutingType routingType, SimpleString queueName, boolean durable) throws ActiveMQException {
+ createSharedQueue(address, routingType, queueName, null, durable);
+ }
+
+ /**
+ * Creates a transient queue. A queue that will exist as long as there are consumers. When the last consumer is closed the queue will be deleted
+ * <p>
+ * Notice: you will get an exception if the address or the filter doesn't match to an already existent queue
+ *
+ * @param address the queue will be bound to this address
+ * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST
+ * @param queueName the name of the queue
+ * @param filter whether the queue is durable or not
+ * @param durable if the queue is durable
+ * @throws ActiveMQException in an exception occurs while creating the queue
+ */
+ @Override
+ public void createSharedQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
+ boolean durable) throws ActiveMQException {
+ checkClosed();
+
+ startCall();
+ try {
+ sessionContext.createSharedQueue(address, queueName, routingType, filter, durable);
+ } finally {
+ endCall();
+ }
+ }
+
+ /**
+ * Creates a <em>non-temporary</em> queue.
+ *
+ * @param address the queue will be bound to this address
+ * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST
+ * @param queueName the name of the queue
+ * @param durable whether the queue is durable or not
+ * @throws ActiveMQException in an exception occurs while creating the queue
+ */
+ @Override
+ public void createQueue(String address, RoutingType routingType, String queueName, boolean durable) throws ActiveMQException {
+ createQueue(SimpleString.toSimpleString(address), routingType, SimpleString.toSimpleString(queueName), durable);
+ }
+
+ /**
+ * Creates a <em>non-temporary</em> queue <em>non-durable</em> queue.
+ *
+ * @param address the queue will be bound to this address
+ * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST
+ * @param queueName the name of the queue
+ * @throws ActiveMQException in an exception occurs while creating the queue
+ */
+ @Override
+ public void createQueue(String address, RoutingType routingType, String queueName) throws ActiveMQException {
+ internalCreateQueue(SimpleString.toSimpleString(address),
+ SimpleString.toSimpleString(queueName), routingType,
+ null,
+ false,
+ true,
+ ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
+ ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(),
+ false);
+ }
+
+ /**
+ * Creates a <em>non-temporary</em> queue <em>non-durable</em> queue.
+ *
+ * @param address the queue will be bound to this address
+ * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST
+ * @param queueName the name of the queue
+ * @throws ActiveMQException in an exception occurs while creating the queue
+ */
+ @Override
+ public void createQueue(SimpleString address, RoutingType routingType, SimpleString queueName) throws ActiveMQException {
+ internalCreateQueue(address,
+ queueName,
+ routingType,
+ null,
+ true,
+ false,
+ ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
+ ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(),
+ false);
+ }
+
+ /**
+ * Creates a <em>non-temporary</em> queue.
+ *
+ * @param address the queue will be bound to this address
+ * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST
+ * @param queueName the name of the queue
+ * @param filter only messages which match this filter will be put in the queue
+ * @param durable whether the queue is durable or not
+ * @throws ActiveMQException in an exception occurs while creating the queue
+ */
+ @Override
+ public void createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
+ boolean durable) throws ActiveMQException {
+ internalCreateQueue(address,
+ queueName,
+ routingType,
+ filter,
+ durable,
+ !durable,
+ ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
+ ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(),
+ false);
+ }
+
+ /**
+ * Creates a <em>non-temporary</em>queue.
+ *
+ * @param address the queue will be bound to this address
+ * @param routingType the delivery mode for this queue, MULTICAST or ANYCAST
+ * @param queueName the name of the queue
+ * @param filter only messages which match this filter will be put in the queue
+ * @param durable whether the queue is durable or not
+ * @throws ActiveMQException in an exception occurs while creating the queue
+ */
+ @Override
+ public void createQueue(String address, RoutingType routingType, String queueName, String filter, boolean durable) throws ActiveMQException {
+ createQueue(SimpleString.toSimpleString(address), routingType, SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter),
+ durable);
+ }
+
+
+ @Override
public void deleteQueue(final SimpleString queueName) throws ActiveMQException {
checkClosed();
@@ -1567,9 +1802,12 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
private void internalCreateQueue(final SimpleString address,
final SimpleString queueName,
+ final RoutingType routingType,
final SimpleString filterString,
final boolean durable,
final boolean temp,
+ final int maxConsumers,
+ final boolean deleteOnNoConsumers,
final boolean autoCreated) throws ActiveMQException {
checkClosed();
@@ -1579,7 +1817,15 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
startCall();
try {
- sessionContext.createQueue(address, queueName, filterString, durable, temp, autoCreated);
+ sessionContext.createQueue(address,
+ routingType,
+ queueName,
+ filterString,
+ durable,
+ temp,
+ ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
+ ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(),
+ autoCreated);
} finally {
endCall();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index cbbe2b7..29426dd 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -24,8 +24,10 @@ import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Executor;
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -52,9 +54,9 @@ import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionMessage;
@@ -100,6 +102,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAR
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage;
+import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
@@ -240,9 +243,18 @@ public class ActiveMQSessionContext extends SessionContext {
@Override
public void createSharedQueue(SimpleString address,
SimpleString queueName,
+ RoutingType routingType,
SimpleString filterString,
boolean durable) throws ActiveMQException {
- sessionChannel.sendBlocking(new CreateSharedQueueMessage(address, queueName, filterString, durable, true), PacketImpl.NULL_RESPONSE);
+ sessionChannel.sendBlocking(new CreateSharedQueueMessage_V2(address, queueName, routingType, filterString, durable, true), PacketImpl.NULL_RESPONSE);
+ }
+
+ @Override
+ public void createSharedQueue(SimpleString address,
+ SimpleString queueName,
+ SimpleString filterString,
+ boolean durable) throws ActiveMQException {
+ createSharedQueue(address, queueName, null, filterString, durable);
}
@Override
@@ -585,19 +597,35 @@ public class ActiveMQSessionContext extends SessionContext {
}
@Override
- public void createAddress(SimpleString address, final boolean multicast, final boolean autoCreated) throws ActiveMQException {
- CreateAddressMessage request = new CreateAddressMessage(address, multicast, autoCreated, true);
+ public void createAddress(SimpleString address,
+ Set<RoutingType> routingTypes,
+ final boolean autoCreated) throws ActiveMQException {
+ CreateAddressMessage request = new CreateAddressMessage(address, routingTypes, autoCreated, true);
sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE);
}
+ @Deprecated
+ @Override
+ public void createQueue(SimpleString address,
+ SimpleString queueName,
+ SimpleString filterString,
+ boolean durable,
+ boolean temp,
+ boolean autoCreated) throws ActiveMQException {
+ createQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName, filterString, durable, temp, ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers(), autoCreated);
+ }
+
@Override
public void createQueue(SimpleString address,
+ RoutingType routingType,
SimpleString queueName,
SimpleString filterString,
boolean durable,
boolean temp,
+ int maxConsumers,
+ boolean deleteOnNoConsumers,
boolean autoCreated) throws ActiveMQException {
- CreateQueueMessage request = new CreateQueueMessage_V2(address, queueName, filterString, durable, temp, autoCreated, true);
+ CreateQueueMessage request = new CreateQueueMessage_V3(address, queueName, routingType, filterString, durable, temp, maxConsumers, deleteOnNoConsumers, autoCreated, true);
sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE);
}
@@ -682,6 +710,7 @@ public class ActiveMQSessionContext extends SessionContext {
// they are defined in broker.xml
// This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover
if (!queueInfo.isDurable()) {
+ // TODO (mtaylor) QueueInfo needs updating to include new parameters, this method should pass in del mode
CreateQueueMessage createQueueRequest = new CreateQueueMessage(queueInfo.getAddress(), queueInfo.getName(), queueInfo.getFilterString(), false, queueInfo.isTemporary(), false);
sendPacketWithoutLock(sessionChannel, createQueueRequest);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
index de1edbc..dbd7091 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
@@ -30,9 +30,11 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterTop
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage_V2;
@@ -93,7 +95,9 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CRE
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_ADDRESS;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE_V2;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE_V3;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE_V2;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DELETE_QUEUE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DISCONNECT;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DISCONNECT_CONSUMER;
@@ -251,10 +255,18 @@ public abstract class PacketDecoder implements Serializable {
packet = new CreateQueueMessage_V2();
break;
}
+ case CREATE_QUEUE_V3: {
+ packet = new CreateQueueMessage_V3();
+ break;
+ }
case CREATE_SHARED_QUEUE: {
packet = new CreateSharedQueueMessage();
break;
}
+ case CREATE_SHARED_QUEUE_V2: {
+ packet = new CreateSharedQueueMessage_V2();
+ break;
+ }
case DELETE_QUEUE: {
packet = new SessionDeleteQueueMessage();
break;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index abc1eef..e252623 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -253,6 +253,10 @@ public class PacketImpl implements Packet {
public static final byte CREATE_QUEUE_V2 = -12;
+ public static final byte CREATE_QUEUE_V3 = -13;
+
+ public static final byte CREATE_SHARED_QUEUE_V2 = -14;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java
index 10c7ff3..9b18e48 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java
@@ -16,28 +16,32 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+import java.util.HashSet;
+import java.util.Set;
+
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.core.server.RoutingType;
public class CreateAddressMessage extends PacketImpl {
private SimpleString address;
- private boolean multicast;
+ private Set<RoutingType> routingTypes;
private boolean autoCreated;
private boolean requiresResponse;
public CreateAddressMessage(final SimpleString address,
- final boolean multicast,
+ Set<RoutingType> routingTypes,
final boolean autoCreated,
final boolean requiresResponse) {
this();
this.address = address;
- this.multicast = multicast;
+ this.routingTypes = routingTypes;
this.autoCreated = autoCreated;
this.requiresResponse = requiresResponse;
}
@@ -52,7 +56,7 @@ public class CreateAddressMessage extends PacketImpl {
public String toString() {
StringBuffer buff = new StringBuffer(getParentString());
buff.append(", address=" + address);
- buff.append(", multicast=" + multicast);
+ buff.append(", routingTypes=" + routingTypes);
buff.append(", autoCreated=" + autoCreated);
buff.append("]");
return buff.toString();
@@ -62,10 +66,6 @@ public class CreateAddressMessage extends PacketImpl {
return address;
}
- public boolean isMulticast() {
- return multicast;
- }
-
public boolean isRequiresResponse() {
return requiresResponse;
}
@@ -78,10 +78,21 @@ public class CreateAddressMessage extends PacketImpl {
this.address = address;
}
+ public Set<RoutingType> getRoutingTypes() {
+ return routingTypes;
+ }
+
+ public void setRoutingTypes(Set<RoutingType> routingTypes) {
+ this.routingTypes = routingTypes;
+ }
+
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeSimpleString(address);
- buffer.writeBoolean(multicast);
+ buffer.writeInt(routingTypes.size());
+ for (RoutingType routingType : routingTypes) {
+ buffer.writeByte(routingType.getType());
+ }
buffer.writeBoolean(requiresResponse);
buffer.writeBoolean(autoCreated);
}
@@ -89,7 +100,11 @@ public class CreateAddressMessage extends PacketImpl {
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
address = buffer.readSimpleString();
- multicast = buffer.readBoolean();
+ int routingTypeSetSize = buffer.readInt();
+ routingTypes = new HashSet<>(routingTypeSetSize);
+ for (int i = 0; i < routingTypeSetSize; i++) {
+ routingTypes.add(RoutingType.getType(buffer.readByte()));
+ }
requiresResponse = buffer.readBoolean();
autoCreated = buffer.readBoolean();
}
@@ -99,7 +114,7 @@ public class CreateAddressMessage extends PacketImpl {
final int prime = 31;
int result = super.hashCode();
result = prime * result + ((address == null) ? 0 : address.hashCode());
- result = prime * result + (multicast ? 1231 : 1237);
+ result = prime * result + (routingTypes.hashCode());
result = prime * result + (autoCreated ? 1231 : 1237);
result = prime * result + (requiresResponse ? 1231 : 1237);
return result;
@@ -119,7 +134,7 @@ public class CreateAddressMessage extends PacketImpl {
return false;
} else if (!address.equals(other.address))
return false;
- if (multicast != other.multicast)
+ if (routingTypes.equals(other.routingTypes))
return false;
if (autoCreated != other.autoCreated)
return false;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java
index 13a4a58..610646e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java
@@ -21,7 +21,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
public class CreateQueueMessage_V2 extends CreateQueueMessage {
- private boolean autoCreated;
+ protected boolean autoCreated;
public CreateQueueMessage_V2(final SimpleString address,
final SimpleString queueName,
@@ -45,6 +45,10 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
super(CREATE_QUEUE_V2);
}
+ public CreateQueueMessage_V2(byte packet) {
+ super(packet);
+ }
+
// Public --------------------------------------------------------
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V3.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V3.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V3.java
new file mode 100644
index 0000000..fb5c9ef
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V3.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.RoutingType;
+
+public class CreateQueueMessage_V3 extends CreateQueueMessage_V2 {
+
+ private RoutingType routingType;
+
+ private int maxConsumers;
+
+ private boolean deleteOnNoConsumers;
+
+ public CreateQueueMessage_V3(final SimpleString address,
+ final SimpleString queueName,
+ final RoutingType routingType,
+ final SimpleString filterString,
+ final boolean durable,
+ final boolean temporary,
+ final int maxConsumers,
+ final boolean deleteOnNoConsumers,
+ final boolean autoCreated,
+ final boolean requiresResponse) {
+ this();
+
+ this.address = address;
+ this.queueName = queueName;
+ this.filterString = filterString;
+ this.durable = durable;
+ this.temporary = temporary;
+ this.autoCreated = autoCreated;
+ this.requiresResponse = requiresResponse;
+ this.routingType = routingType;
+ this.maxConsumers = maxConsumers;
+ this.deleteOnNoConsumers = deleteOnNoConsumers;
+ }
+
+ public CreateQueueMessage_V3() {
+ super(CREATE_QUEUE_V3);
+ }
+
+ // Public --------------------------------------------------------
+
+ @Override
+ public String toString() {
+ StringBuffer buff = new StringBuffer(super.getParentString());
+ buff.append(", routingType=" + routingType);
+ buff.append(", maxConsumers=" + maxConsumers);
+ buff.append(", deleteOnNoConsumers=" + deleteOnNoConsumers);
+ buff.append("]");
+ return buff.toString();
+ }
+
+ public RoutingType getRoutingType() {
+ return routingType;
+ }
+
+ public void setRoutingType(RoutingType routingType) {
+ this.routingType = routingType;
+ }
+
+ public int getMaxConsumers() {
+ return maxConsumers;
+ }
+
+ public void setMaxConsumers(int maxConsumers) {
+ this.maxConsumers = maxConsumers;
+ }
+
+ public boolean isDeleteOnNoConsumers() {
+ return deleteOnNoConsumers;
+ }
+
+ public void setDeleteOnNoConsumers(boolean deleteOnNoConsumers) {
+ this.deleteOnNoConsumers = deleteOnNoConsumers;
+ }
+
+ @Override
+ public void encodeRest(final ActiveMQBuffer buffer) {
+ super.encodeRest(buffer);
+ buffer.writeByte(routingType.getType());
+ buffer.writeInt(maxConsumers);
+ buffer.writeBoolean(deleteOnNoConsumers);
+ }
+
+ @Override
+ public void decodeRest(final ActiveMQBuffer buffer) {
+ super.decodeRest(buffer);
+ routingType = RoutingType.getType(buffer.readByte());
+ maxConsumers = buffer.readInt();
+ deleteOnNoConsumers = buffer.readBoolean();
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = super.hashCode();
+ result = prime * result + (routingType.getType());
+ result = prime * result + (maxConsumers);
+ result = prime * result + (deleteOnNoConsumers ? 1231 : 1237);
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (!super.equals(obj))
+ return false;
+ if (!(obj instanceof CreateQueueMessage_V3))
+ return false;
+ CreateQueueMessage_V3 other = (CreateQueueMessage_V3) obj;
+ if (autoCreated != other.autoCreated)
+ return false;
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java
index f896102..af25ae9 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage.java
@@ -22,15 +22,15 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
public class CreateSharedQueueMessage extends PacketImpl {
- private SimpleString address;
+ protected SimpleString address;
- private SimpleString queueName;
+ protected SimpleString queueName;
- private SimpleString filterString;
+ protected SimpleString filterString;
- private boolean durable;
+ protected boolean durable;
- private boolean requiresResponse;
+ protected boolean requiresResponse;
public CreateSharedQueueMessage(final SimpleString address,
final SimpleString queueName,
@@ -47,7 +47,11 @@ public class CreateSharedQueueMessage extends PacketImpl {
}
public CreateSharedQueueMessage() {
- super(CREATE_SHARED_QUEUE);
+ this(CREATE_SHARED_QUEUE);
+ }
+
+ public CreateSharedQueueMessage(byte packetType) {
+ super(packetType);
}
// Public --------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
new file mode 100644
index 0000000..7c45ca7
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.core.server.RoutingType;
+
+public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
+
+ private RoutingType routingType;
+
+ public CreateSharedQueueMessage_V2(final SimpleString address,
+ final SimpleString queueName,
+ final RoutingType routingType,
+ final SimpleString filterString,
+ final boolean durable,
+ final boolean requiresResponse) {
+ this();
+
+ this.address = address;
+ this.queueName = queueName;
+ this.filterString = filterString;
+ this.durable = durable;
+ this.requiresResponse = requiresResponse;
+ this.routingType = routingType;
+ }
+
+ public CreateSharedQueueMessage_V2() {
+ super(CREATE_SHARED_QUEUE_V2);
+ }
+
+ public RoutingType getRoutingType() {
+ return routingType;
+ }
+
+ public void setRoutingType(RoutingType routingType) {
+ this.routingType = routingType;
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer buff = new StringBuffer(getParentString());
+ buff.append(", address=" + address);
+ buff.append(", queueName=" + queueName);
+ buff.append(", filterString=" + filterString);
+ buff.append(", durable=" + durable);
+ buff.append(", requiresResponse=" + requiresResponse);
+ buff.append("]");
+ return buff.toString();
+ }
+
+
+ @Override
+ public void encodeRest(final ActiveMQBuffer buffer) {
+ buffer.writeSimpleString(address);
+ buffer.writeSimpleString(queueName);
+ buffer.writeNullableSimpleString(filterString);
+ buffer.writeBoolean(durable);
+ buffer.writeByte(routingType.getType());
+ buffer.writeBoolean(requiresResponse);
+ }
+
+ @Override
+ public void decodeRest(final ActiveMQBuffer buffer) {
+ address = buffer.readSimpleString();
+ queueName = buffer.readSimpleString();
+ filterString = buffer.readNullableSimpleString();
+ durable = buffer.readBoolean();
+ routingType = RoutingType.getType(buffer.readByte());
+ requiresResponse = buffer.readBoolean();
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = super.hashCode();
+ result = prime * result + ((address == null) ? 0 : address.hashCode());
+ result = prime * result + ((filterString == null) ? 0 : filterString.hashCode());
+ result = prime * result + ((queueName == null) ? 0 : queueName.hashCode());
+ result = prime * result + (durable ? 1231 : 1237);
+ result = prime * result + routingType.getType();
+ result = prime * result + (requiresResponse ? 1231 : 1237);
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (!super.equals(obj))
+ return false;
+ if (!(obj instanceof CreateSharedQueueMessage_V2))
+ return false;
+ CreateSharedQueueMessage_V2 other = (CreateSharedQueueMessage_V2) obj;
+ if (address == null) {
+ if (other.address != null)
+ return false;
+ } else if (!address.equals(other.address))
+ return false;
+ if (filterString == null) {
+ if (other.filterString != null)
+ return false;
+ } else if (!filterString.equals(other.filterString))
+ return false;
+ if (queueName == null) {
+ if (other.queueName != null)
+ return false;
+ } else if (!queueName.equals(other.queueName))
+ return false;
+ if (durable != other.durable)
+ return false;
+ if (routingType != other.routingType)
+ return false;
+ if (requiresResponse != other.requiresResponse)
+ return false;
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dad04960/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/RoutingType.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/RoutingType.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/RoutingType.java
new file mode 100644
index 0000000..2f17335
--- /dev/null
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/RoutingType.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server;
+
+public enum RoutingType {
+
+ MULTICAST, ANYCAST;
+
+ public byte getType() {
+ switch (this) {
+ case MULTICAST:
+ return 0;
+ case ANYCAST:
+ return 1;
+ default:
+ return -1;
+ }
+ }
+
+ public static RoutingType getType(byte type) {
+ switch (type) {
+ case 0:
+ return MULTICAST;
+ case 1:
+ return ANYCAST;
+ default:
+ return null;
+ }
+ }
+}