You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/12/09 19:49:18 UTC
[34/50] [abbrv] activemq-artemis git commit: ARTEMIS-780 Consolodate
protocol packets and new Address/Queue commands
ARTEMIS-780 Consolodate protocol packets and new Address/Queue commands
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c480351c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c480351c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c480351c
Branch: refs/heads/master
Commit: c480351c11e038dac210cd921d618d2906a8ab72
Parents: 7a51491
Author: jbertram <jb...@apache.org>
Authored: Wed Nov 23 13:34:00 2016 -0600
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Fri Dec 9 18:43:15 2016 +0000
----------------------------------------------------------------------
.../activemq/artemis/cli/commands/Create.java | 8 +-
.../artemis/cli/commands/etc/broker.xml | 12 +-
.../core/management/ActiveMQServerControl.java | 47 +++++--
.../api/core/management/QueueControl.java | 12 ++
.../core/impl/ActiveMQSessionContext.java | 4 +-
.../core/protocol/core/impl/PacketDecoder.java | 6 -
.../core/protocol/core/impl/PacketImpl.java | 4 +-
.../impl/wireformat/CreateAddressMessage.java | 5 +-
.../impl/wireformat/CreateQueueMessage_V2.java | 64 ++++++++-
.../impl/wireformat/CreateQueueMessage_V3.java | 134 ------------------
.../wireformat/CreateSharedQueueMessage_V2.java | 1 -
.../artemis/junit/ActiveMQConsumerResource.java | 4 -
.../impl/ActiveMQServerControlImpl.java | 31 ++--
.../core/management/impl/QueueControlImpl.java | 24 ++++
.../core/ServerSessionPacketHandler.java | 20 ---
.../artemis/core/security/CheckType.java | 6 +
.../artemis/core/server/ActiveMQServer.java | 2 +-
.../core/server/impl/ActiveMQServerImpl.java | 15 +-
.../artemis/core/server/impl/AddressInfo.java | 1 -
.../management/impl/ManagementServiceImpl.java | 3 +-
.../client/AutoDeleteJmsDestinationTest.java | 2 +-
.../tests/integration/client/SessionTest.java | 2 +-
.../management/ActiveMQServerControlTest.java | 37 +++++
.../ActiveMQServerControlUsingCoreTest.java | 27 ++--
.../management/QueueControlUsingCoreTest.java | 10 ++
.../integration/server/PredefinedQueueTest.java | 1 -
.../tests/integration/stomp/StompTest.java | 6 +-
.../jms/tests/AutoAckMesageListenerTest.java | 141 -------------------
.../jms/tests/AutoAckMessageListenerTest.java | 141 +++++++++++++++++++
.../activemq/artemis/common/AbstractAdmin.java | 4 +-
30 files changed, 391 insertions(+), 383 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c480351c/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
index ff2753a..7175c8d 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
@@ -799,14 +799,14 @@ public class Create extends InputAbstract {
printWriter.println();
for (String str : getQueueList()) {
- printWriter.println(" <address name=\"" + str + "\" type=\"anycast\">");
- printWriter.println(" <queues>");
+ printWriter.println(" <address name=\"" + str + "\">");
+ printWriter.println(" <anycast>");
printWriter.println(" <queue name=\"" + str + "\" />");
- printWriter.println(" </queues>");
+ printWriter.println(" </anycast>");
printWriter.println(" </address>");
}
for (String str : getAddressList()) {
- printWriter.println(" <address name=\"" + str + "\" type=\"multicast\"/>");
+ printWriter.println(" <address name=\"" + str + "\"/>");
}
filters.put("${address-queue.settings}", writer.toString());
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c480351c/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
index 6c7f91c..ea51eb0 100644
--- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
+++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
@@ -98,15 +98,15 @@ ${cluster-security.settings}${cluster.settings}${replicated.settings}${shared-st
</address-settings>
<addresses>
- <address name="DLQ" type="anycast">
- <queues>
+ <address name="DLQ">
+ <anycast>
<queue name="DLQ" />
- </queues>
+ </anycast>
</address>
- <address name="ExpiryQueue" type="anycast">
- <queues>
+ <address name="ExpiryQueue">
+ <anycast>
<queue name="ExpiryQueue" />
- </queues>
+ </anycast>
</address>${address-queue.settings}
</addresses>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c480351c/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
index 43e7a4d..b6b5b5e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
@@ -18,10 +18,8 @@ package org.apache.activemq.artemis.api.core.management;
import javax.management.MBeanOperationInfo;
import java.util.Map;
-import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
-import org.apache.activemq.artemis.core.server.RoutingType;
/**
* An ActiveMQServerControl is used to manage ActiveMQ Artemis servers.
@@ -438,7 +436,7 @@ public interface ActiveMQServerControl {
@Operation(desc = "delete an address", impact = MBeanOperationInfo.ACTION)
void createAddress(@Parameter(name = "name", desc = "The name of the address") String name,
- @Parameter(name = "deliveryMode", desc = "The delivery modes enabled for this address'") Set<RoutingType> routingTypes) throws Exception;
+ @Parameter(name = "deliveryMode", desc = "The delivery modes enabled for this address'") Object[] routingTypes) throws Exception;
@Operation(desc = "delete an address", impact = MBeanOperationInfo.ACTION)
void deleteAddress(@Parameter(name = "name", desc = "The name of the address") String name) throws Exception;
@@ -457,14 +455,21 @@ public interface ActiveMQServerControl {
void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
@Parameter(name = "name", desc = "Name of the queue") String name) throws Exception;
+ /**
+ * Create a queue.
+ * <br>
+ * If {@code address} is {@code null} it will be defaulted to {@code name}.
+ * <br>
+ * This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
+ *
+ * @param address address to bind the queue to
+ * @param name name of the queue
+ * @param durable whether the queue is durable
+ */
+ @Operation(desc = "Create a queue with the specified address, name and durability", impact = MBeanOperationInfo.ACTION)
void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
- @Parameter(name = "routingType", desc = "The routing type used for this address, 0=multicast, 1=anycast") RoutingType routingType,
@Parameter(name = "name", desc = "Name of the queue") String name,
- @Parameter(name = "filter", desc = "Filter of the queue") String filterStr,
- @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable,
- @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers,
- @Parameter(name = "deleteOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean deleteOnNoConsumers,
- @Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception;
+ @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable) throws Exception;
/**
* Create a queue.
@@ -491,14 +496,25 @@ public interface ActiveMQServerControl {
* <br>
* This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
*
- * @param address address to bind the queue to
- * @param name name of the queue
- * @param durable whether the queue is durable
+ * @param address address to bind the queue to
+ * @param routingType the routing type used for this address, {@code MULTICAST} or {@code ANYCAST}
+ * @param name name of the queue
+ * @param filterStr filter of the queue
+ * @param durable is the queue durable?
+ * @param maxConsumers the maximum number of consumers allowed on this queue at any one time
+ * @param deleteOnNoConsumers delete this queue when the last consumer disconnects
+ * @param autoCreateAddress create an address with default values should a matching address not be found
+ * @throws Exception
*/
- @Operation(desc = "Create a queue with the specified address, name and durability", impact = MBeanOperationInfo.ACTION)
void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
+ @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType,
@Parameter(name = "name", desc = "Name of the queue") String name,
- @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable) throws Exception;
+ @Parameter(name = "filter", desc = "Filter of the queue") String filterStr,
+ @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable,
+ @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers,
+ @Parameter(name = "deleteOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean deleteOnNoConsumers,
+ @Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception;
+
/**
* Deploy a durable queue.
@@ -552,7 +568,8 @@ public interface ActiveMQServerControl {
*/
@Operation(desc = "Destroy a queue", impact = MBeanOperationInfo.ACTION)
void destroyQueue(@Parameter(name = "name", desc = "Name of the queue to destroy") String name,
- @Parameter(name = "removeConsumers", desc = "Remove consumers of this queue") boolean removeConsumers, boolean autoDeleteAddress) throws Exception;
+ @Parameter(name = "removeConsumers", desc = "Remove consumers of this queue") boolean removeConsumers,
+ @Parameter(name = "autoDeleteAddress", desc = "Automatically delete the address if this was the last queue") boolean autoDeleteAddress) throws Exception;
/**
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c480351c/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
index bbf365c..dbd3ea5 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
@@ -140,6 +140,18 @@ public interface QueueControl {
@Attribute(desc = "dead-letter address associated with this queue")
String getDeadLetterAddress();
+ /**
+ *
+ */
+ @Attribute(desc = "maximum number of consumers allowed on this queue at any one time")
+ int getMaxConsumers();
+
+ /**
+ *
+ */
+ @Attribute(desc = "delete this queue when the last consumer disconnects")
+ boolean isDeleteOnNoConsumers();
+
// Operations ----------------------------------------------------
/**
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c480351c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index 29426dd..ed08142 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -54,7 +54,7 @@ import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V3;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerMessage;
@@ -625,7 +625,7 @@ public class ActiveMQSessionContext extends SessionContext {
int maxConsumers,
boolean deleteOnNoConsumers,
boolean autoCreated) throws ActiveMQException {
- CreateQueueMessage request = new CreateQueueMessage_V3(address, queueName, routingType, filterString, durable, temp, maxConsumers, deleteOnNoConsumers, autoCreated, true);
+ CreateQueueMessage request = new CreateQueueMessage_V2(address, queueName, routingType, filterString, durable, temp, maxConsumers, deleteOnNoConsumers, autoCreated, true);
sessionChannel.sendBlocking(request, PacketImpl.NULL_RESPONSE);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c480351c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
index dbd7091..15629c8 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
@@ -30,7 +30,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterTop
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
@@ -95,7 +94,6 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CRE
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_ADDRESS;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE_V2;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE_V3;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE_V2;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DELETE_QUEUE;
@@ -255,10 +253,6 @@ public abstract class PacketDecoder implements Serializable {
packet = new CreateQueueMessage_V2();
break;
}
- case CREATE_QUEUE_V3: {
- packet = new CreateQueueMessage_V3();
- break;
- }
case CREATE_SHARED_QUEUE: {
packet = new CreateSharedQueueMessage();
break;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c480351c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index e252623..a65bdfc 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -253,9 +253,7 @@ public class PacketImpl implements Packet {
public static final byte CREATE_QUEUE_V2 = -12;
- public static final byte CREATE_QUEUE_V3 = -13;
-
- public static final byte CREATE_SHARED_QUEUE_V2 = -14;
+ public static final byte CREATE_SHARED_QUEUE_V2 = -13;
// Static --------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c480351c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java
index 9b18e48..35f0f40 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateAddressMessage.java
@@ -134,7 +134,10 @@ public class CreateAddressMessage extends PacketImpl {
return false;
} else if (!address.equals(other.address))
return false;
- if (routingTypes.equals(other.routingTypes))
+ if (routingTypes == null) {
+ if (other.routingTypes != null)
+ return false;
+ } else if (!routingTypes.equals(other.routingTypes))
return false;
if (autoCreated != other.autoCreated)
return false;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c480351c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java
index 610646e..e2867ab 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java
@@ -18,16 +18,26 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.RoutingType;
public class CreateQueueMessage_V2 extends CreateQueueMessage {
protected boolean autoCreated;
+ private RoutingType routingType;
+
+ private int maxConsumers;
+
+ private boolean deleteOnNoConsumers;
+
public CreateQueueMessage_V2(final SimpleString address,
final SimpleString queueName,
+ final RoutingType routingType,
final SimpleString filterString,
final boolean durable,
final boolean temporary,
+ final int maxConsumers,
+ final boolean deleteOnNoConsumers,
final boolean autoCreated,
final boolean requiresResponse) {
this();
@@ -39,26 +49,52 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
this.temporary = temporary;
this.autoCreated = autoCreated;
this.requiresResponse = requiresResponse;
+ this.routingType = routingType;
+ this.maxConsumers = maxConsumers;
+ this.deleteOnNoConsumers = deleteOnNoConsumers;
}
public CreateQueueMessage_V2() {
super(CREATE_QUEUE_V2);
}
- public CreateQueueMessage_V2(byte packet) {
- super(packet);
- }
-
// Public --------------------------------------------------------
@Override
public String toString() {
StringBuffer buff = new StringBuffer(super.getParentString());
buff.append(", autoCreated=" + autoCreated);
+ buff.append(", routingType=" + routingType);
+ buff.append(", maxConsumers=" + maxConsumers);
+ buff.append(", deleteOnNoConsumers=" + deleteOnNoConsumers);
buff.append("]");
return buff.toString();
}
+ public RoutingType getRoutingType() {
+ return routingType;
+ }
+
+ public void setRoutingType(RoutingType routingType) {
+ this.routingType = routingType;
+ }
+
+ public int getMaxConsumers() {
+ return maxConsumers;
+ }
+
+ public void setMaxConsumers(int maxConsumers) {
+ this.maxConsumers = maxConsumers;
+ }
+
+ public boolean isDeleteOnNoConsumers() {
+ return deleteOnNoConsumers;
+ }
+
+ public void setDeleteOnNoConsumers(boolean deleteOnNoConsumers) {
+ this.deleteOnNoConsumers = deleteOnNoConsumers;
+ }
+
public boolean isAutoCreated() {
return autoCreated;
}
@@ -71,12 +107,18 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer);
buffer.writeBoolean(autoCreated);
+ buffer.writeByte(routingType.getType());
+ buffer.writeInt(maxConsumers);
+ buffer.writeBoolean(deleteOnNoConsumers);
}
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
super.decodeRest(buffer);
autoCreated = buffer.readBoolean();
+ routingType = RoutingType.getType(buffer.readByte());
+ maxConsumers = buffer.readInt();
+ deleteOnNoConsumers = buffer.readBoolean();
}
@Override
@@ -84,6 +126,9 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
final int prime = 31;
int result = super.hashCode();
result = prime * result + (autoCreated ? 1231 : 1237);
+ result = prime * result + (routingType.getType());
+ result = prime * result + (maxConsumers);
+ result = prime * result + (deleteOnNoConsumers ? 1231 : 1237);
return result;
}
@@ -98,6 +143,17 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
CreateQueueMessage_V2 other = (CreateQueueMessage_V2) obj;
if (autoCreated != other.autoCreated)
return false;
+ if (maxConsumers != other.maxConsumers)
+ return false;
+ if (deleteOnNoConsumers != other.deleteOnNoConsumers)
+ return false;
+ if (deleteOnNoConsumers != other.deleteOnNoConsumers)
+ return false;
+ if (routingType == null) {
+ if (other.routingType != null)
+ return false;
+ } else if (!routingType.equals(other.routingType))
+ return false;
return true;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c480351c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V3.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V3.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V3.java
deleted file mode 100644
index fb5c9ef..0000000
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V3.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.server.RoutingType;
-
-public class CreateQueueMessage_V3 extends CreateQueueMessage_V2 {
-
- private RoutingType routingType;
-
- private int maxConsumers;
-
- private boolean deleteOnNoConsumers;
-
- public CreateQueueMessage_V3(final SimpleString address,
- final SimpleString queueName,
- final RoutingType routingType,
- final SimpleString filterString,
- final boolean durable,
- final boolean temporary,
- final int maxConsumers,
- final boolean deleteOnNoConsumers,
- final boolean autoCreated,
- final boolean requiresResponse) {
- this();
-
- this.address = address;
- this.queueName = queueName;
- this.filterString = filterString;
- this.durable = durable;
- this.temporary = temporary;
- this.autoCreated = autoCreated;
- this.requiresResponse = requiresResponse;
- this.routingType = routingType;
- this.maxConsumers = maxConsumers;
- this.deleteOnNoConsumers = deleteOnNoConsumers;
- }
-
- public CreateQueueMessage_V3() {
- super(CREATE_QUEUE_V3);
- }
-
- // Public --------------------------------------------------------
-
- @Override
- public String toString() {
- StringBuffer buff = new StringBuffer(super.getParentString());
- buff.append(", routingType=" + routingType);
- buff.append(", maxConsumers=" + maxConsumers);
- buff.append(", deleteOnNoConsumers=" + deleteOnNoConsumers);
- buff.append("]");
- return buff.toString();
- }
-
- public RoutingType getRoutingType() {
- return routingType;
- }
-
- public void setRoutingType(RoutingType routingType) {
- this.routingType = routingType;
- }
-
- public int getMaxConsumers() {
- return maxConsumers;
- }
-
- public void setMaxConsumers(int maxConsumers) {
- this.maxConsumers = maxConsumers;
- }
-
- public boolean isDeleteOnNoConsumers() {
- return deleteOnNoConsumers;
- }
-
- public void setDeleteOnNoConsumers(boolean deleteOnNoConsumers) {
- this.deleteOnNoConsumers = deleteOnNoConsumers;
- }
-
- @Override
- public void encodeRest(final ActiveMQBuffer buffer) {
- super.encodeRest(buffer);
- buffer.writeByte(routingType.getType());
- buffer.writeInt(maxConsumers);
- buffer.writeBoolean(deleteOnNoConsumers);
- }
-
- @Override
- public void decodeRest(final ActiveMQBuffer buffer) {
- super.decodeRest(buffer);
- routingType = RoutingType.getType(buffer.readByte());
- maxConsumers = buffer.readInt();
- deleteOnNoConsumers = buffer.readBoolean();
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = super.hashCode();
- result = prime * result + (routingType.getType());
- result = prime * result + (maxConsumers);
- result = prime * result + (deleteOnNoConsumers ? 1231 : 1237);
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (!super.equals(obj))
- return false;
- if (!(obj instanceof CreateQueueMessage_V3))
- return false;
- CreateQueueMessage_V3 other = (CreateQueueMessage_V3) obj;
- if (autoCreated != other.autoCreated)
- return false;
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c480351c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
index 7c45ca7..40b9cb5 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.server.RoutingType;
public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c480351c/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQConsumerResource.java
----------------------------------------------------------------------
diff --git a/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQConsumerResource.java b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQConsumerResource.java
index 8b09827..bab092d 100644
--- a/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQConsumerResource.java
+++ b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQConsumerResource.java
@@ -16,16 +16,12 @@
*/
package org.apache.activemq.artemis.junit;
-import java.util.Collections;
-import java.util.HashSet;
-
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.RoutingType;
-import org.apache.activemq.artemis.core.server.impl.AddressInfo;
/**
* A JUnit Rule that embeds an ActiveMQ Artemis ClientConsumer into a test.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c480351c/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 185d5c4..9e103f4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -36,6 +36,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -77,9 +78,9 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ConnectorServiceFactory;
import org.apache.activemq.artemis.core.server.Consumer;
-import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
@@ -563,12 +564,16 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
@Override
public void createAddress(@Parameter(name = "name", desc = "The name of the address") String name,
- @Parameter(name = "deliveryMode", desc = "The delivery modes enabled for this address'") Set<RoutingType> routingTypes) throws Exception {
+ @Parameter(name = "deliveryMode", desc = "The delivery modes enabled for this address'") Object[] routingTypes) throws Exception {
checkStarted();
clearIO();
try {
- server.createAddressInfo(new AddressInfo(new SimpleString(name), routingTypes));
+ Set<RoutingType> set = new HashSet<>();
+ for (Object routingType : routingTypes) {
+ set.add(RoutingType.valueOf(routingType.toString()));
+ }
+ server.createAddressInfo(new AddressInfo(new SimpleString(name), set));
} finally {
blockOnIO();
}
@@ -580,7 +585,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
clearIO();
try {
- server.removeAddressInfo(new SimpleString(name));
+ server.removeAddressInfo(new SimpleString(name), null);
} finally {
blockOnIO();
}
@@ -642,14 +647,14 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
}
@Override
- public void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
- @Parameter(name = "routingType", desc = "The routing type used for this address, 0=multicast, 1=anycast") RoutingType routingType,
- @Parameter(name = "name", desc = "Name of the queue") String name,
- @Parameter(name = "filter", desc = "Filter of the queue") String filterStr,
- @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable,
- @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers,
- @Parameter(name = "deleteOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean deleteOnNoConsumers,
- @Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception {
+ public void createQueue(String address,
+ String routingType,
+ String name,
+ String filterStr,
+ boolean durable,
+ int maxConsumers,
+ boolean deleteOnNoConsumers,
+ boolean autoCreateAddress) throws Exception {
checkStarted();
clearIO();
@@ -660,7 +665,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
filter = new SimpleString(filterStr);
}
- server.createQueue(SimpleString.toSimpleString(address), routingType, new SimpleString(name), filter, durable, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
+ server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType), new SimpleString(name), filter, durable, false, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
} finally {
blockOnIO();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c480351c/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index c4d25ac..3bbbac8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -332,6 +332,30 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
@Override
+ public int getMaxConsumers() {
+ checkStarted();
+
+ clearIO();
+ try {
+ return queue.getMaxConsumers();
+ } finally {
+ blockOnIO();
+ }
+ }
+
+ @Override
+ public boolean isDeleteOnNoConsumers() {
+ checkStarted();
+
+ clearIO();
+ try {
+ return queue.isDeleteOnNoConsumers();
+ } finally {
+ blockOnIO();
+ }
+ }
+
+ @Override
public Map<String, Object>[] listScheduledMessages() throws Exception {
checkStarted();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c480351c/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 5c43683..65ffc69 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -33,7 +33,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQEx
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
@@ -80,7 +79,6 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
-import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.ServerMessage;
@@ -91,7 +89,6 @@ import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_ADDRESS;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE_V2;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE_V3;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE_V2;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DELETE_QUEUE;
@@ -251,23 +248,6 @@ public class ServerSessionPacketHandler implements ChannelHandler {
case CREATE_QUEUE_V2: {
CreateQueueMessage_V2 request = (CreateQueueMessage_V2) packet;
requiresResponse = request.isRequiresResponse();
- session.createQueue(request.getAddress(),
- request.getQueueName(),
- RoutingType.MULTICAST,
- request.getFilterString(),
- request.isTemporary(),
- request.isDurable(),
- Queue.MAX_CONSUMERS_UNLIMITED,
- false,
- request.isAutoCreated());
- if (requiresResponse) {
- response = new NullResponseMessage();
- }
- break;
- }
- case CREATE_QUEUE_V3: {
- CreateQueueMessage_V3 request = (CreateQueueMessage_V3) packet;
- requiresResponse = request.isRequiresResponse();
session.createQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isTemporary(), request.isDurable(), request.getMaxConsumers(), request.isDeleteOnNoConsumers(),
request.isAutoCreated());
if (requiresResponse) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c480351c/artemis-server/src/main/java/org/apache/activemq/artemis/core/security/CheckType.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/security/CheckType.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/security/CheckType.java
index abea943..92e92df 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/security/CheckType.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/security/CheckType.java
@@ -35,6 +35,12 @@ public enum CheckType {
return role.isCreateAddress();
}
},
+ DELETE_ADDRESS {
+ @Override
+ public boolean hasRole(final Role role) {
+ return role.isDeleteAddress();
+ }
+ },
CREATE_DURABLE_QUEUE {
@Override
public boolean hasRole(final Role role) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c480351c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index 74292c6..0aeaf6b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -456,7 +456,7 @@ public interface ActiveMQServer extends ActiveMQComponent {
AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception;
- void removeAddressInfo(SimpleString address) throws Exception;
+ void removeAddressInfo(SimpleString address, SecurityAuth session) throws Exception;
String getInternalNamingPrefix();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c480351c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index d9c2199..f6a0ebd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -110,7 +110,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
-import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
@@ -123,6 +122,7 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.ServiceRegistry;
@@ -1477,7 +1477,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final SimpleString filterString,
final boolean durable,
final boolean temporary) throws Exception {
- return createQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName, filterString, durable, temporary);
+ return createQueue(address, ActiveMQDefaultConfiguration.getDefaultRoutingType(), queueName, filterString, durable, temporary);
}
@Override
@@ -1717,7 +1717,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (autoDeleteAddress && postOffice != null) {
try {
- removeAddressInfo(address);
+ removeAddressInfo(address, session);
} catch (ActiveMQDeleteAddressException e) {
// Could be thrown if the address has bindings or is not deletable.
}
@@ -2392,7 +2392,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
@Override
- public void removeAddressInfo(SimpleString address) throws Exception {
+ public void removeAddressInfo(final SimpleString address,
+ final SecurityAuth session) throws Exception {
+ if (session != null) {
+ securityStore.check(address, CheckType.DELETE_ADDRESS, session);
+ }
+
AddressInfo addressInfo = getAddressInfo(address);
if (postOffice.removeAddressInfo(address) == null) {
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address);
@@ -2451,7 +2456,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
AddressInfo defaultAddressInfo = new AddressInfo(addressName);
- defaultAddressInfo.addRoutingType(ActiveMQDefaultConfiguration.getDefaultRoutingType());
+ defaultAddressInfo.addRoutingType(routingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : routingType);
AddressInfo info = postOffice.getAddressInfo(addressName);
if (info == null) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c480351c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
index 64d6dd5..7816cde 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.core.server.impl;
-import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c480351c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index 4cfa57a..213f965 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -719,7 +719,8 @@ public class ManagementServiceImpl implements ManagementService {
paramTypes[i] == Long.TYPE && params[i].getClass() == Long.class ||
paramTypes[i] == Double.TYPE && params[i].getClass() == Double.class ||
paramTypes[i] == Integer.TYPE && params[i].getClass() == Integer.class ||
- paramTypes[i] == Boolean.TYPE && params[i].getClass() == Boolean.class) {
+ paramTypes[i] == Boolean.TYPE && params[i].getClass() == Boolean.class ||
+ paramTypes[i] == Object[].class && params[i].getClass() == Object[].class) {
// parameter match
} else {
match = false;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c480351c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteJmsDestinationTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteJmsDestinationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteJmsDestinationTest.java
index 0e9b2e9..0c63e4c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteJmsDestinationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteJmsDestinationTest.java
@@ -82,7 +82,7 @@ public class AutoDeleteJmsDestinationTest extends JMSTestBase {
@Test
public void testAutoDeleteNegative() throws Exception {
- server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoDeleteJmsQueues(false));
+ server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoDeleteQueues(false));
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c480351c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionTest.java
index b3c7a4c..0882078 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionTest.java
@@ -230,11 +230,11 @@ public class SessionTest extends ActiveMQTestBase {
@Test
public void testQueueQueryNoQ() throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateQueues(false));
- server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateAddresses(false));
cf = createSessionFactory(locator);
ClientSession clientSession = cf.createSession(false, true, true);
QueueQuery resp = clientSession.queueQuery(new SimpleString(queueName));
Assert.assertFalse(resp.isExists());
+ Assert.assertFalse(resp.isAutoCreateJmsQueues());
Assert.assertEquals(null, resp.getAddress());
clientSession.close();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c480351c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
index c4a388e..4ca23c4 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
@@ -37,6 +37,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
+import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.AddressSettingsInfo;
import org.apache.activemq.artemis.api.core.management.BridgeControl;
import org.apache.activemq.artemis.api.core.management.DivertControl;
@@ -52,6 +53,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
@@ -252,6 +254,41 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
}
@Test
+ public void testCreateAndDestroyQueue_4() throws Exception {
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString name = RandomUtil.randomSimpleString();
+ boolean durable = RandomUtil.randomBoolean();
+ boolean deleteOnNoConsumers = RandomUtil.randomBoolean();
+ boolean autoCreateAddress = true;
+ int maxConsumers = RandomUtil.randomInt();
+
+ ActiveMQServerControl serverControl = createManagementControl();
+
+ checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
+
+ serverControl.createQueue(address.toString(), RoutingType.ANYCAST.toString(), name.toString(), null, durable, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
+
+ checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
+ QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, mbeanServer);
+ Assert.assertEquals(address.toString(), queueControl.getAddress());
+ Assert.assertEquals(name.toString(), queueControl.getName());
+ Assert.assertNull(queueControl.getFilter());
+ Assert.assertEquals(durable, queueControl.isDurable());
+ Assert.assertEquals(deleteOnNoConsumers, queueControl.isDeleteOnNoConsumers());
+ Assert.assertEquals(maxConsumers, queueControl.getMaxConsumers());
+ Assert.assertEquals(false, queueControl.isTemporary());
+
+ checkResource(ObjectNameBuilder.DEFAULT.getAddressObjectName(address));
+ AddressControl addressControl = ManagementControlHelper.createAddressControl(address, mbeanServer);
+ Assert.assertEquals(address.toString(), addressControl.getAddress());
+
+ serverControl.destroyQueue(name.toString(), true, true);
+
+ checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name));
+ checkNoResource(ObjectNameBuilder.DEFAULT.getAddressObjectName(address));
+ }
+
+ @Test
public void testCreateAndDestroyQueueClosingConsumers() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString name = RandomUtil.randomSimpleString();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c480351c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
index 3dc5fb2..193c58c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
@@ -17,13 +17,11 @@
package org.apache.activemq.artemis.tests.integration.management;
import java.util.Map;
-import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.Parameter;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
-import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTest {
@@ -103,15 +101,15 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
}
@Override
- public void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
- @Parameter(name = "routingType", desc = "The routing type used for this address, 0=multicast, 1=anycast") RoutingType routingType,
- @Parameter(name = "name", desc = "Name of the queue") String name,
- @Parameter(name = "filter", desc = "Filter of the queue") String filterStr,
- @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable,
- @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers,
- @Parameter(name = "deleteOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean deleteOnNoConsumers,
- @Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception {
-
+ public void createQueue(String address,
+ String routingType,
+ String name,
+ String filterStr,
+ boolean durable,
+ int maxConsumers,
+ boolean deleteOnNoConsumers,
+ boolean autoCreateAddress) throws Exception {
+ proxy.invokeOperation("createQueue", address, routingType, name, filterStr, durable, maxConsumers, deleteOnNoConsumers, autoCreateAddress);
}
@@ -158,9 +156,10 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
}
@Override
- public void destroyQueue(@Parameter(name = "name", desc = "Name of the queue to destroy") String name,
- @Parameter(name = "removeConsumers", desc = "Remove consumers of this queue") boolean removeConsumers,
+ public void destroyQueue(String name,
+ boolean removeConsumers,
boolean autoDeleteAddress) throws Exception {
+ proxy.invokeOperation("destroyQueue", name, removeConsumers, autoDeleteAddress);
}
@Override
@@ -567,7 +566,7 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
@Override
public void createAddress(@Parameter(name = "name", desc = "The name of the address") String name,
- @Parameter(name = "deliveryMode", desc = "The delivery modes enabled for this address'") Set<RoutingType> routingTypes) throws Exception {
+ @Parameter(name = "deliveryMode", desc = "The delivery modes enabled for this address'") Object[] routingTypes) throws Exception {
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c480351c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
index 250289a..34a6e4f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
@@ -82,6 +82,16 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
}
@Override
+ public int getMaxConsumers() {
+ return (Integer) proxy.retrieveAttributeValue("maxConsumers");
+ }
+
+ @Override
+ public boolean isDeleteOnNoConsumers() {
+ return (Boolean) proxy.retrieveAttributeValue("deleteOnNoConsumers");
+ }
+
+ @Override
public int getDeliveringCount() {
return (Integer) proxy.retrieveAttributeValue("deliveringCount", Integer.class);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c480351c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/PredefinedQueueTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/PredefinedQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/PredefinedQueueTest.java
index 93eec9c..0df3846 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/PredefinedQueueTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/PredefinedQueueTest.java
@@ -33,7 +33,6 @@ import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
-import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c480351c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index 45946ed..60ce168 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -83,7 +83,11 @@ public class StompTest extends StompTestBase {
boolean connected = conn != null && conn.isConnected();
log.debug("Connection 1.0 connected: " + connected);
if (connected) {
- conn.disconnect();
+ try {
+ conn.disconnect();
+ } catch (Exception e) {
+ // ignore
+ }
}
} finally {
super.tearDown();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c480351c/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AutoAckMesageListenerTest.java
----------------------------------------------------------------------
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AutoAckMesageListenerTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AutoAckMesageListenerTest.java
deleted file mode 100644
index e7877ce..0000000
--- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AutoAckMesageListenerTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.jms.tests;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Test;
-
-public class AutoAckMesageListenerTest extends JMSTestCase {
-
- // Constants -----------------------------------------------------
-
- private static final JmsTestLogger log = JmsTestLogger.LOGGER;
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- @Test
- public void testAutoAckMsgListenerQueue() throws Exception {
- Connection conn = null;
-
- try {
- CountDownLatch latch = new CountDownLatch(1);
-
- conn = createConnection();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(queue1);
- MessageConsumer consumer = session.createConsumer(queue1);
- AutoAckMsgListener listener = new AutoAckMsgListener(latch, session);
- consumer.setMessageListener(listener);
-
- // create and send messages
- log.info("Send and receive two message");
- Message messageSent = session.createMessage();
- messageSent.setBooleanProperty("last", false);
- producer.send(messageSent);
- messageSent.setBooleanProperty("last", true);
- producer.send(messageSent);
-
- conn.start();
-
- // wait until message is received
- log.info("waiting until message has been received by message listener...");
- latch.await(10, TimeUnit.SECONDS);
-
- // check message listener status
- if (listener.getPassed() == false) {
- throw new Exception("failed");
- }
- } finally {
- if (conn != null) {
- conn.close();
- }
- }
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
- private static class AutoAckMsgListener implements MessageListener {
-
- private boolean passed;
-
- private final Session session;
-
- private final CountDownLatch monitor;
-
- private AutoAckMsgListener(CountDownLatch latch, Session session) {
- this.monitor = latch;
- this.session = session;
- }
-
- // get state of test
- public boolean getPassed() {
- return passed;
- }
-
- // will receive two messages
- @Override
- public void onMessage(Message message) {
- try {
- if (message.getBooleanProperty("last") == false) {
- log.info("Received first message.");
- if (message.getJMSRedelivered() == true) {
- // should not re-receive this one
- log.info("Error: received first message twice");
- passed = false;
- }
- } else {
- if (message.getJMSRedelivered() == false) {
- // received second message for first time
- log.info("Received second message. Calling recover()");
- session.recover();
- } else {
- // should be redelivered after recover
- log.info("Received second message again as expected");
- passed = true;
- monitor.countDown();
- }
- }
- } catch (JMSException e) {
- log.warn("Exception caught in message listener:\n" + e);
- passed = false;
- monitor.countDown();
- }
-
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c480351c/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AutoAckMessageListenerTest.java
----------------------------------------------------------------------
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AutoAckMessageListenerTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AutoAckMessageListenerTest.java
new file mode 100644
index 0000000..b667c58
--- /dev/null
+++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AutoAckMessageListenerTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jms.tests;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+public class AutoAckMessageListenerTest extends JMSTestCase {
+
+ // Constants -----------------------------------------------------
+
+ private static final JmsTestLogger log = JmsTestLogger.LOGGER;
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ @Test
+ public void testAutoAckMsgListenerQueue() throws Exception {
+ Connection conn = null;
+
+ try {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ conn = createConnection();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(queue1);
+ MessageConsumer consumer = session.createConsumer(queue1);
+ AutoAckMsgListener listener = new AutoAckMsgListener(latch, session);
+ consumer.setMessageListener(listener);
+
+ // create and send messages
+ log.info("Send and receive two message");
+ Message messageSent = session.createMessage();
+ messageSent.setBooleanProperty("last", false);
+ producer.send(messageSent);
+ messageSent.setBooleanProperty("last", true);
+ producer.send(messageSent);
+
+ conn.start();
+
+ // wait until message is received
+ log.info("waiting until message has been received by message listener...");
+ latch.await(10, TimeUnit.SECONDS);
+
+ // check message listener status
+ if (listener.getPassed() == false) {
+ throw new Exception("failed");
+ }
+ } finally {
+ if (conn != null) {
+ conn.close();
+ }
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ private static class AutoAckMsgListener implements MessageListener {
+
+ private boolean passed;
+
+ private final Session session;
+
+ private final CountDownLatch monitor;
+
+ private AutoAckMsgListener(CountDownLatch latch, Session session) {
+ this.monitor = latch;
+ this.session = session;
+ }
+
+ // get state of test
+ public boolean getPassed() {
+ return passed;
+ }
+
+ // will receive two messages
+ @Override
+ public void onMessage(Message message) {
+ try {
+ if (message.getBooleanProperty("last") == false) {
+ log.info("Received first message.");
+ if (message.getJMSRedelivered() == true) {
+ // should not re-receive this one
+ log.info("Error: received first message twice");
+ passed = false;
+ }
+ } else {
+ if (message.getJMSRedelivered() == false) {
+ // received second message for first time
+ log.info("Received second message. Calling recover()");
+ session.recover();
+ } else {
+ // should be redelivered after recover
+ log.info("Received second message again as expected");
+ passed = true;
+ monitor.countDown();
+ }
+ }
+ } catch (JMSException e) {
+ log.warn("Exception caught in message listener:\n" + e);
+ passed = false;
+ monitor.countDown();
+ }
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c480351c/tests/joram-tests/src/test/java/org/apache/activemq/artemis/common/AbstractAdmin.java
----------------------------------------------------------------------
diff --git a/tests/joram-tests/src/test/java/org/apache/activemq/artemis/common/AbstractAdmin.java b/tests/joram-tests/src/test/java/org/apache/activemq/artemis/common/AbstractAdmin.java
index bbe99fc..8ea7828 100644
--- a/tests/joram-tests/src/test/java/org/apache/activemq/artemis/common/AbstractAdmin.java
+++ b/tests/joram-tests/src/test/java/org/apache/activemq/artemis/common/AbstractAdmin.java
@@ -34,7 +34,6 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
-import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.tests.util.SpawnedVMSupport;
import org.objectweb.jtests.jms.admin.Admin;
@@ -148,9 +147,8 @@ public class AbstractAdmin implements Admin {
@Override
public void createTopic(final String name) {
- Boolean result;
try {
- invokeSyncOperation(ResourceNames.BROKER, "createAddress", name, (int) RoutingType.MULTICAST.getType(), false, -1);
+ invokeSyncOperation(ResourceNames.BROKER, "createAddress", name, new Object[]{"MULTICAST"});
} catch (Exception e) {
throw new IllegalStateException(e);
}