You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/09/26 21:58:49 UTC
[4/6] activemq-artemis git commit: ARTEMIS-737 - added JUnit rules
for Artemis servers and clients
ARTEMIS-737 - added JUnit rules for Artemis servers and clients
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/db095926
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/db095926
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/db095926
Branch: refs/heads/master
Commit: db095926ed1ddb9f59c32650af42e4b19312e76a
Parents: 27ed7ec
Author: Quinn Stevenson <qu...@pronoia-solutions.com>
Authored: Wed Sep 21 15:45:05 2016 -0600
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Sep 26 17:57:52 2016 -0400
----------------------------------------------------------------------
artemis-junit/pom.xml | 70 ++
.../junit/AbstractActiveMQClientResource.java | 157 ++++
.../artemis/junit/ActiveMQConsumerResource.java | 132 +++
.../junit/ActiveMQDynamicProducerResource.java | 166 ++++
.../artemis/junit/ActiveMQProducerResource.java | 273 ++++++
.../artemis/junit/EmbeddedActiveMQResource.java | 878 +++++++++++++++++++
.../artemis/junit/EmbeddedJMSResource.java | 749 ++++++++++++++++
.../junit/ActiveMQConsumerResourceTest.java | 87 ++
.../ActiveMQDynamicProducerResourceTest.java | 97 ++
...ucerResourceWithoutAddressExceptionTest.java | 75 ++
...namicProducerResourceWithoutAddressTest.java | 105 +++
.../junit/ActiveMQProducerResourceTest.java | 85 ++
...ActiveMQResourceCustomConfigurationTest.java | 57 ++
...edActiveMQResourceFileConfigurationTest.java | 46 +
.../junit/EmbeddedActiveMQResourceTest.java | 88 ++
...MSResourceMultipleFileConfigurationTest.java | 79 ++
.../junit/EmbeddedJMSResourceQueueTest.java | 102 +++
...dJMSResourceSingleFileConfigurationTest.java | 79 ++
.../junit/EmbeddedJMSResourceTopicTest.java | 127 +++
.../MultipleEmbeddedActiveMQResourcesTest.java | 67 ++
.../junit/MultipleEmbeddedJMSResourcesTest.java | 54 ++
.../resources/embedded-artemis-jms-only.xml | 29 +
.../resources/embedded-artemis-jms-server.xml | 40 +
.../embedded-artemis-minimal-server.xml | 31 +
.../test/resources/embedded-artemis-server.xml | 41 +
.../src/test/resources/logging.properties | 61 ++
pom.xml | 1 +
27 files changed, 3776 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/db095926/artemis-junit/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-junit/pom.xml b/artemis-junit/pom.xml
new file mode 100644
index 0000000..89aa47f
--- /dev/null
+++ b/artemis-junit/pom.xml
@@ -0,0 +1,70 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>artemis-pom</artifactId>
+ <version>1.5.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>artemis-junit</artifactId>
+ <packaging>jar</packaging>
+ <name>ActiveMQ Artemis JUnit Rules</name>
+
+ <properties>
+ <activemq.basedir>${project.basedir}/..</activemq.basedir>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <!--
+ -->
+ <dependency>
+ <groupId>org.jboss.logmanager</groupId>
+ <artifactId>jboss-logmanager</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>artemis-jms-server</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>artemis-jms-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/db095926/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/AbstractActiveMQClientResource.java
----------------------------------------------------------------------
diff --git a/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/AbstractActiveMQClientResource.java b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/AbstractActiveMQClientResource.java
new file mode 100644
index 0000000..74b9db8
--- /dev/null
+++ b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/AbstractActiveMQClientResource.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.junit;
+
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractActiveMQClientResource extends ExternalResource {
+
+ Logger log = LoggerFactory.getLogger(this.getClass());
+
+ boolean autoCreateQueue = true;
+
+ ServerLocator serverLocator;
+ ClientSessionFactory sessionFactory;
+ ClientSession session;
+
+ public AbstractActiveMQClientResource(String url) {
+ if (url == null) {
+ throw new IllegalArgumentException(String.format("Error creating {} - url cannot be null", this.getClass().getSimpleName()));
+ }
+
+ try {
+ this.serverLocator = ActiveMQClient.createServerLocator(url);
+ }
+ catch (Exception ex) {
+ throw new RuntimeException(String.format("Error creating {} - createServerLocator( {} ) failed", this.getClass().getSimpleName(), url.toString()), ex);
+ }
+ }
+
+ public AbstractActiveMQClientResource(ServerLocator serverLocator) {
+ if (serverLocator == null) {
+ throw new IllegalArgumentException(String.format("Error creating {} - ServerLocator cannot be null", this.getClass().getSimpleName()));
+ }
+
+ this.serverLocator = serverLocator;
+ }
+
+ /**
+ * Adds properties to a ClientMessage
+ *
+ * @param message
+ * @param properties
+ */
+ public static void addMessageProperties(ClientMessage message, Map<String, Object> properties) {
+ if (properties != null && properties.size() > 0) {
+ for (Map.Entry<String, Object> property : properties.entrySet()) {
+ message.putObjectProperty(property.getKey(), property.getValue());
+ }
+ }
+ }
+
+ @Override
+ protected void before() throws Throwable {
+ super.before();
+ start();
+ }
+
+ @Override
+ protected void after() {
+ stop();
+ super.after();
+ }
+
+ void start() {
+ log.info("Starting {}", this.getClass().getSimpleName());
+ try {
+ sessionFactory = serverLocator.createSessionFactory();
+ session = sessionFactory.createSession();
+ }
+ catch (RuntimeException runtimeEx) {
+ throw runtimeEx;
+ }
+ catch (Exception ex) {
+ throw new ActiveMQClientResourceException(String.format("%s initialisation failure", this.getClass().getSimpleName()), ex);
+ }
+
+ createClient();
+
+ try {
+ session.start();
+ }
+ catch (ActiveMQException amqEx) {
+ throw new ActiveMQClientResourceException(String.format("%s startup failure", this.getClass().getSimpleName()), amqEx);
+ }
+ }
+
+ void stop() {
+ stopClient();
+ if (session != null) {
+ try {
+ session.close();
+ }
+ catch (ActiveMQException amqEx) {
+ log.warn("ActiveMQException encountered closing InternalClient ClientSession - ignoring", amqEx);
+ }
+ finally {
+ session = null;
+ }
+ }
+ if (sessionFactory != null) {
+ sessionFactory.close();
+ sessionFactory = null;
+ }
+ if (serverLocator != null) {
+ serverLocator.close();
+ serverLocator = null;
+ }
+
+ }
+
+ protected abstract void createClient();
+
+ protected abstract void stopClient();
+
+ public boolean isAutoCreateQueue() {
+ return autoCreateQueue;
+ }
+
+ public void setAutoCreateQueue(boolean autoCreateQueue) {
+ this.autoCreateQueue = autoCreateQueue;
+ }
+
+ public static class ActiveMQClientResourceException extends RuntimeException {
+
+ public ActiveMQClientResourceException(String message) {
+ super(message);
+ }
+
+ public ActiveMQClientResourceException(String message, Exception cause) {
+ super(message, cause);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/db095926/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQConsumerResource.java
----------------------------------------------------------------------
diff --git a/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQConsumerResource.java b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQConsumerResource.java
new file mode 100644
index 0000000..2e3088c
--- /dev/null
+++ b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQConsumerResource.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.junit;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+
+public class ActiveMQConsumerResource extends AbstractActiveMQClientResource {
+
+ long defaultReceiveTimeout = 50;
+
+ SimpleString queueName;
+ ClientConsumer consumer;
+
+ public ActiveMQConsumerResource(String url, SimpleString queueName) {
+ super(url);
+ this.queueName = queueName;
+ }
+
+ public ActiveMQConsumerResource(ServerLocator serverLocator, SimpleString queueName) {
+ super(serverLocator);
+ this.queueName = queueName;
+ }
+
+ public long getDefaultReceiveTimeout() {
+ return defaultReceiveTimeout;
+ }
+
+ /**
+ * Sets the default timeout in milliseconds used when receiving messages. Defaults to 50 milliseconds
+ *
+ * @param defaultReceiveTimeout received timeout in milliseconds
+ */
+ public void setDefaultReceiveTimeout(long defaultReceiveTimeout) {
+ this.defaultReceiveTimeout = defaultReceiveTimeout;
+ }
+
+ @Override
+ protected void createClient() {
+ boolean browseOnly = false;
+ try {
+ if (!session.queueQuery(queueName).isExists() && autoCreateQueue) {
+ log.warn("{}: queue does not exist - creating queue: address = {}, name = {}", this.getClass().getSimpleName(), queueName.toString(), queueName.toString());
+ session.createQueue(queueName, queueName);
+ }
+ consumer = session.createConsumer(queueName, browseOnly);
+ }
+ catch (ActiveMQException amqEx) {
+ throw new ActiveMQClientResourceException(String.format("Error creating consumer for queueName %s", queueName.toString()), amqEx);
+ }
+ }
+
+ @Override
+ protected void stopClient() {
+ if (consumer != null) {
+ try {
+ consumer.close();
+ }
+ catch (ActiveMQException amqEx) {
+ log.warn("Exception encountered closing consumer - ignoring", amqEx);
+ }
+ finally {
+ consumer = null;
+ }
+ }
+ }
+
+ public boolean isAutoCreateQueue() {
+ return autoCreateQueue;
+ }
+
+ /**
+ * Enable/Disable the automatic creation of non-existant queues. The default is to automatically create non-existant queues
+ *
+ * @param autoCreateQueue
+ */
+ public void setAutoCreateQueue(boolean autoCreateQueue) {
+ this.autoCreateQueue = autoCreateQueue;
+ }
+
+ public ClientMessage receiveMessage() {
+ return receiveMessage(defaultReceiveTimeout);
+ }
+
+ public ClientMessage receiveMessage(long timeout) {
+ ClientMessage message = null;
+ if (timeout > 0) {
+ try {
+ message = consumer.receive(timeout);
+ }
+ catch (ActiveMQException amqEx) {
+ throw new EmbeddedActiveMQResource.EmbeddedActiveMQResourceException(String.format("ClientConsumer.receive( timeout = %d ) for %s failed", timeout, queueName.toString()), amqEx);
+ }
+ }
+ else if (timeout == 0) {
+ try {
+ message = consumer.receiveImmediate();
+ }
+ catch (ActiveMQException amqEx) {
+ throw new EmbeddedActiveMQResource.EmbeddedActiveMQResourceException(String.format("ClientConsumer.receiveImmediate() for %s failed", queueName.toString()), amqEx);
+ }
+ }
+ else {
+ try {
+ message = consumer.receive();
+ }
+ catch (ActiveMQException amqEx) {
+ throw new EmbeddedActiveMQResource.EmbeddedActiveMQResourceException(String.format("ClientConsumer.receive() for %s failed", queueName.toString()), amqEx);
+ }
+ }
+
+ return message;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/db095926/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQDynamicProducerResource.java
----------------------------------------------------------------------
diff --git a/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQDynamicProducerResource.java b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQDynamicProducerResource.java
new file mode 100644
index 0000000..477dd39
--- /dev/null
+++ b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQDynamicProducerResource.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.junit;
+
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+
+public class ActiveMQDynamicProducerResource extends ActiveMQProducerResource {
+
+ public ActiveMQDynamicProducerResource(String url) {
+ super(url);
+ }
+
+ public ActiveMQDynamicProducerResource(ServerLocator serverLocator) {
+ super(serverLocator);
+ }
+
+ public ActiveMQDynamicProducerResource(String url, SimpleString address) {
+ super(url, address);
+ }
+
+ public ActiveMQDynamicProducerResource(ServerLocator serverLocator, SimpleString address) {
+ super(serverLocator, address);
+ }
+
+ @Override
+ protected void createClient() {
+ try {
+ if (address != null && !session.addressQuery(address).isExists() && autoCreateQueue) {
+ log.warn("queue does not exist - creating queue: address = {}, name = {}", address.toString(), address.toString());
+ session.createQueue(address, address);
+ }
+ producer = session.createProducer((SimpleString) null);
+ }
+ catch (ActiveMQException amqEx) {
+ if (address == null) {
+ throw new ActiveMQClientResourceException(String.format("Error creating producer for address %s", address.toString()), amqEx);
+ }
+ else {
+ throw new ActiveMQClientResourceException("Error creating producer", amqEx);
+ }
+ }
+ }
+
+ /**
+ * Send a ClientMessage to the default address on the server
+ *
+ * @param message the message to send
+ */
+ @Override
+ public void sendMessage(ClientMessage message) {
+ sendMessage(address, message);
+ }
+
+ /**
+ * Send a ClientMessage to the specified address on the server
+ *
+ * @param targetAddress the target address
+ * @param message the message to send
+ */
+ public void sendMessage(SimpleString targetAddress, ClientMessage message) {
+ if (targetAddress == null) {
+ throw new IllegalArgumentException(String.format("%s error - address cannot be null", this.getClass().getSimpleName()));
+ }
+ try {
+ if (autoCreateQueue && !session.addressQuery(targetAddress).isExists()) {
+ log.warn("queue does not exist - creating queue: address = {}, name = {}", address.toString(), address.toString());
+ session.createQueue(targetAddress, targetAddress);
+ }
+ }
+ catch (ActiveMQException amqEx) {
+ throw new ActiveMQClientResourceException(String.format("Queue creation failed for queue: address = %s, name = %s", address.toString(), address.toString()));
+ }
+
+ try {
+ producer.send(targetAddress, message);
+ }
+ catch (ActiveMQException amqEx) {
+ throw new ActiveMQClientResourceException(String.format("Failed to send message to %s", targetAddress.toString()), amqEx);
+ }
+ }
+
+ /**
+ * Create a new ClientMessage with the specified body and send to the specified address on the server
+ *
+ * @param targetAddress the target address
+ * @param body the body for the new message
+ * @return the message that was sent
+ */
+ public ClientMessage sendMessage(SimpleString targetAddress, byte[] body) {
+ ClientMessage message = createMessage(body);
+ sendMessage(targetAddress, message);
+ return message;
+ }
+
+ /**
+ * Create a new ClientMessage with the specified body and send to the server
+ *
+ * @param targetAddress the target address
+ * @param body the body for the new message
+ * @return the message that was sent
+ */
+ public ClientMessage sendMessage(SimpleString targetAddress, String body) {
+ ClientMessage message = createMessage(body);
+ sendMessage(targetAddress, message);
+ return message;
+ }
+
+ /**
+ * Create a new ClientMessage with the specified properties and send to the server
+ *
+ * @param targetAddress the target address
+ * @param properties the properties for the new message
+ * @return the message that was sent
+ */
+ public ClientMessage sendMessage(SimpleString targetAddress, Map<String, Object> properties) {
+ ClientMessage message = createMessage(properties);
+ sendMessage(targetAddress, message);
+ return message;
+ }
+
+ /**
+ * Create a new ClientMessage with the specified body and and properties and send to the server
+ *
+ * @param targetAddress the target address
+ * @param properties the properties for the new message
+ * @return the message that was sent
+ */
+ public ClientMessage sendMessage(SimpleString targetAddress, byte[] body, Map<String, Object> properties) {
+ ClientMessage message = createMessage(body);
+ sendMessage(targetAddress, message);
+ return message;
+ }
+
+ /**
+ * Create a new ClientMessage with the specified body and and properties and send to the server
+ *
+ * @param targetAddress the target address
+ * @param properties the properties for the new message
+ * @return the message that was sent
+ */
+ public ClientMessage sendMessage(SimpleString targetAddress, String body, Map<String, Object> properties) {
+ ClientMessage message = createMessage(body);
+ sendMessage(targetAddress, message);
+ return message;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/db095926/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQProducerResource.java
----------------------------------------------------------------------
diff --git a/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQProducerResource.java b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQProducerResource.java
new file mode 100644
index 0000000..d9336d4
--- /dev/null
+++ b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/ActiveMQProducerResource.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.junit;
+
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+
+public class ActiveMQProducerResource extends AbstractActiveMQClientResource {
+
+ boolean useDurableMessage = true;
+ SimpleString address = null;
+ ClientProducer producer;
+
+ protected ActiveMQProducerResource(String url) {
+ super(url);
+ }
+
+ protected ActiveMQProducerResource(ServerLocator serverLocator) {
+ super(serverLocator);
+ }
+
+ public ActiveMQProducerResource(String url, SimpleString address) {
+ super(url);
+ if (address == null) {
+ throw new IllegalArgumentException(String.format("%s construction error - address cannot be null", this.getClass().getSimpleName()));
+ }
+ this.address = address;
+ }
+
+ public ActiveMQProducerResource(ServerLocator serverLocator, SimpleString address) {
+ super(serverLocator);
+ if (address == null) {
+ throw new IllegalArgumentException(String.format("%s construction error - address cannot be null", this.getClass().getSimpleName()));
+ }
+ this.address = address;
+ }
+
+ public boolean isUseDurableMessage() {
+ return useDurableMessage;
+ }
+
+ /**
+ * Disables/Enables creating durable messages. By default, durable messages are created
+ *
+ * @param useDurableMessage if true, durable messages will be created
+ */
+ public void setUseDurableMessage(boolean useDurableMessage) {
+ this.useDurableMessage = useDurableMessage;
+ }
+
+ @Override
+ protected void createClient() {
+ try {
+ if (!session.addressQuery(address).isExists() && autoCreateQueue) {
+ log.warn("{}: queue does not exist - creating queue: address = {}, name = {}", this.getClass().getSimpleName(), address.toString(), address.toString());
+ session.createQueue(address, address);
+ }
+ producer = session.createProducer(address);
+ }
+ catch (ActiveMQException amqEx) {
+ throw new ActiveMQClientResourceException(String.format("Error creating producer for address %s", address.toString()), amqEx);
+ }
+ }
+
+ @Override
+ protected void stopClient() {
+ if (producer != null) {
+ try {
+ producer.close();
+ }
+ catch (ActiveMQException amqEx) {
+ log.warn("ActiveMQException encountered closing InternalClient ClientProducer - ignoring", amqEx);
+ }
+ finally {
+ producer = null;
+ }
+ }
+ }
+
+ /**
+ * Create a ClientMessage
+ * <p>
+ * If useDurableMessage is false, a non-durable message is created. Otherwise, a durable message is created
+ *
+ * @return a new ClientMessage
+ */
+ public ClientMessage createMessage() {
+ if (session == null) {
+ throw new IllegalStateException("ClientSession is null");
+ }
+ return session.createMessage(isUseDurableMessage());
+ }
+
+ /**
+ * Create a ClientMessage with the specified body
+ * <p>
+ * If useDurableMessage is false, a non-durable message is created. Otherwise, a durable message is created
+ *
+ * @param body the body for the new message
+ * @return a new ClientMessage with the specified body
+ */
+ public ClientMessage createMessage(byte[] body) {
+ ClientMessage message = createMessage();
+
+ if (body != null) {
+ message.writeBodyBufferBytes(body);
+ }
+
+ return message;
+ }
+
+ /**
+ * Create a ClientMessage with the specified body
+ * <p>
+ * If useDurableMessage is false, a non-durable message is created. Otherwise, a durable message is created
+ *
+ * @param body the body for the new message
+ * @return a new ClientMessage with the specified body
+ */
+ public ClientMessage createMessage(String body) {
+ ClientMessage message = createMessage();
+
+ if (body != null) {
+ message.writeBodyBufferString(body);
+ }
+
+ return message;
+ }
+
+ /**
+ * Create a ClientMessage with the specified message properties
+ * <p>
+ * If useDurableMessage is false, a non-durable message is created. Otherwise, a durable message is created
+ *
+ * @param properties message properties for the new message
+ * @return a new ClientMessage with the specified message properties
+ */
+ public ClientMessage createMessage(Map<String, Object> properties) {
+ ClientMessage message = createMessage();
+
+ addMessageProperties(message, properties);
+
+ return message;
+ }
+
+ /**
+ * Create a ClientMessage with the specified body and message properties
+ * <p>
+ * If useDurableMessage is false, a non-durable message is created. Otherwise, a durable message is created
+ *
+ * @param body the body for the new message
+ * @param properties message properties for the new message
+ * @return a new ClientMessage with the specified body and message properties
+ */
+ public ClientMessage createMessage(byte[] body, Map<String, Object> properties) {
+ ClientMessage message = createMessage(body);
+
+ addMessageProperties(message, properties);
+
+ return message;
+ }
+
+ /**
+ * Create a ClientMessage with the specified body and message properties
+ * <p>
+ * If useDurableMessage is false, a non-durable message is created. Otherwise, a durable message is created
+ *
+ * @param body the body for the new message
+ * @param properties message properties for the new message
+ * @return a new ClientMessage with the specified body and message properties
+ */
+ public ClientMessage createMessage(String body, Map<String, Object> properties) {
+ ClientMessage message = createMessage(body);
+
+ addMessageProperties(message, properties);
+
+ return message;
+ }
+
+ /**
+ * Send a ClientMessage to the server
+ *
+ * @param message the message to send
+ */
+ public void sendMessage(ClientMessage message) {
+ try {
+ producer.send(message);
+ }
+ catch (ActiveMQException amqEx) {
+ throw new ActiveMQClientResourceException(String.format("Failed to send message to %s", producer.getAddress().toString()), amqEx);
+ }
+ }
+
+ /**
+ * Create a new ClientMessage with the specified body and send to the server
+ *
+ * @param body the body for the new message
+ * @return the message that was sent
+ */
+ public ClientMessage sendMessage(byte[] body) {
+ ClientMessage message = createMessage(body);
+ sendMessage(message);
+ return message;
+ }
+
+ /**
+ * Create a new ClientMessage with the specified body and send to the server
+ *
+ * @param body the body for the new message
+ * @return the message that was sent
+ */
+ public ClientMessage sendMessage(String body) {
+ ClientMessage message = createMessage(body);
+ sendMessage(message);
+ return message;
+ }
+
+ /**
+ * Create a new ClientMessage with the specified properties and send to the server
+ *
+ * @param properties the properties for the new message
+ * @return the message that was sent
+ */
+ public ClientMessage sendMessage(Map<String, Object> properties) {
+ ClientMessage message = createMessage(properties);
+ sendMessage(message);
+ return message;
+ }
+
+ /**
+ * Create a new ClientMessage with the specified body and and properties and send to the server
+ *
+ * @param properties the properties for the new message
+ * @return the message that was sent
+ */
+ public ClientMessage sendMessage(byte[] body, Map<String, Object> properties) {
+ ClientMessage message = createMessage(body);
+ sendMessage(message);
+ return message;
+ }
+
+ /**
+ * Create a new ClientMessage with the specified body and and properties and send to the server
+ *
+ * @param properties the properties for the new message
+ * @return the message that was sent
+ */
+ public ClientMessage sendMessage(String body, Map<String, Object> properties) {
+ ClientMessage message = createMessage(body);
+ sendMessage(message);
+ return message;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/db095926/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/EmbeddedActiveMQResource.java
----------------------------------------------------------------------
diff --git a/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/EmbeddedActiveMQResource.java b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/EmbeddedActiveMQResource.java
new file mode 100644
index 0000000..90e567f
--- /dev/null
+++ b/artemis-junit/src/main/java/org/apache/activemq/artemis/junit/EmbeddedActiveMQResource.java
@@ -0,0 +1,878 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.junit;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.FileDeploymentManager;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
+import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.BindingQueryResult;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A JUnit Rule that embeds an ActiveMQ Artemis server into a test.
+ */
+public class EmbeddedActiveMQResource extends ExternalResource {
+
+ static final String SERVER_NAME = "embedded-server";
+
+ Logger log = LoggerFactory.getLogger(this.getClass());
+
+ boolean useDurableMessage = true;
+ boolean useDurableQueue = true;
+ long defaultReceiveTimeout = 50;
+
+ Configuration configuration;
+
+ EmbeddedActiveMQ server;
+
+ InternalClient internalClient;
+
+ /**
+ * Create a default EmbeddedActiveMQResource
+ */
+ public EmbeddedActiveMQResource() {
+ configuration = new ConfigurationImpl().setName(SERVER_NAME).setPersistenceEnabled(false).setSecurityEnabled(false).addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+ init();
+ }
+
+ /**
+ * Create a default EmbeddedActiveMQResource with the specified serverId
+ *
+ * @param serverId server id
+ */
+ public EmbeddedActiveMQResource(int serverId) {
+ Map<String, Object> params = new HashMap<>();
+ params.put(TransportConstants.SERVER_ID_PROP_NAME, serverId);
+ TransportConfiguration transportConfiguration = new TransportConfiguration(InVMAcceptorFactory.class.getName(), params);
+ configuration = new ConfigurationImpl().setName(SERVER_NAME + "-" + serverId).setPersistenceEnabled(false).setSecurityEnabled(false).addAcceptorConfiguration(transportConfiguration);
+ init();
+ }
+
+ /**
+ * Creates an EmbeddedActiveMQResource using the specified configuration
+ *
+ * @param configuration ActiveMQServer configuration
+ */
+ public EmbeddedActiveMQResource(Configuration configuration) {
+ this.configuration = configuration;
+ init();
+ }
+
+ /**
+ * Creates an EmbeddedActiveMQResource using the specified configuration file
+ *
+ * @param filename ActiveMQServer configuration file name
+ */
+ public EmbeddedActiveMQResource(String filename) {
+ if (filename == null) {
+ throw new IllegalArgumentException("ActiveMQServer configuration file name cannot be null");
+ }
+ FileDeploymentManager deploymentManager = new FileDeploymentManager(filename);
+ FileConfiguration config = new FileConfiguration();
+ deploymentManager.addDeployable(config);
+ try {
+ deploymentManager.readConfiguration();
+ }
+ catch (Exception ex) {
+ throw new EmbeddedActiveMQResourceException(String.format("Failed to read configuration file %s", filename), ex);
+ }
+ this.configuration = config;
+ init();
+ }
+
+ /**
+ * Adds properties to a ClientMessage
+ *
+ * @param message
+ * @param properties
+ */
+ public static void addMessageProperties(ClientMessage message, Map<String, Object> properties) {
+ if (properties != null && properties.size() > 0) {
+ for (Map.Entry<String, Object> property : properties.entrySet()) {
+ message.putObjectProperty(property.getKey(), property.getValue());
+ }
+ }
+ }
+
+ private void init() {
+ if (server == null) {
+ server = new EmbeddedActiveMQ().setConfiguration(configuration);
+ }
+ }
+
+ /**
+ * Start the embedded ActiveMQ Artemis server.
+ * <p/>
+ * The server will normally be started by JUnit using the before() method. This method allows the server to
+ * be started manually to support advanced testing scenarios.
+ */
+ public void start() {
+ try {
+ server.start();
+ }
+ catch (Exception ex) {
+ throw new RuntimeException(String.format("Exception encountered starting %s: %s", server.getClass().getName(), this.getServerName()), ex);
+ }
+
+ configuration = server.getActiveMQServer().getConfiguration();
+ }
+
+ /**
+ * Stop the embedded ActiveMQ Artemis server
+ * <p/>
+ * The server will normally be stopped by JUnit using the after() method. This method allows the server to
+ * be stopped manually to support advanced testing scenarios.
+ */
+ public void stop() {
+ if (internalClient != null) {
+ internalClient.stop();
+ internalClient = null;
+ }
+
+ if (server != null) {
+ try {
+ server.stop();
+ }
+ catch (Exception ex) {
+ log.warn(String.format("Exception encountered stopping %s: %s", server.getClass().getSimpleName(), this.getServerName()), ex);
+ }
+ }
+ }
+
+ /**
+ * Invoked by JUnit to setup the resource - start the embedded ActiveMQ Artemis server
+ * <p/>
+ */
+ @Override
+ protected void before() throws Throwable {
+ log.info("Starting {}: {}", this.getClass().getSimpleName(), getServerName());
+
+ this.start();
+
+ super.before();
+ }
+
+ /**
+ * Invoked by JUnit to tear down the resource - stops the embedded ActiveMQ Artemis server
+ */
+ @Override
+ protected void after() {
+ log.info("Stopping {}: {}", this.getClass().getSimpleName(), getServerName());
+
+ super.after();
+
+ this.stop();
+ }
+
+ public boolean isUseDurableMessage() {
+ return useDurableMessage;
+ }
+
+ /**
+ * Disables/Enables creating durable messages. By default, durable messages are created
+ *
+ * @param useDurableMessage if true, durable messages will be created
+ */
+ public void setUseDurableMessage(boolean useDurableMessage) {
+ this.useDurableMessage = useDurableMessage;
+ }
+
+ public boolean isUseDurableQueue() {
+ return useDurableQueue;
+ }
+
+ /**
+ * Disables/Enables creating durable queues. By default, durable queues are created
+ *
+ * @param useDurableQueue if true, durable messages will be created
+ */
+ public void setUseDurableQueue(boolean useDurableQueue) {
+ this.useDurableQueue = useDurableQueue;
+ }
+
+ public long getDefaultReceiveTimeout() {
+ return defaultReceiveTimeout;
+ }
+
+ /**
+ * Sets the default timeout in milliseconds used when receiving messages. Defaults to 50 milliseconds
+ *
+ * @param defaultReceiveTimeout received timeout in milliseconds
+ */
+ public void setDefaultReceiveTimeout(long defaultReceiveTimeout) {
+ this.defaultReceiveTimeout = defaultReceiveTimeout;
+ }
+
+ /**
+ * Get the EmbeddedActiveMQ server.
+ * <p/>
+ * This may be required for advanced configuration of the EmbeddedActiveMQ server.
+ *
+ * @return the embedded ActiveMQ broker
+ */
+ public EmbeddedActiveMQ getServer() {
+ return server;
+ }
+
+ /**
+ * Get the name of the EmbeddedActiveMQ server
+ *
+ * @return name of the embedded server
+ */
+ public String getServerName() {
+ String name = "unknown";
+ ActiveMQServer activeMQServer = server.getActiveMQServer();
+ if (activeMQServer != null) {
+ name = activeMQServer.getConfiguration().getName();
+ }
+ else if (configuration != null) {
+ name = configuration.getName();
+ }
+
+ return name;
+ }
+
+ /**
+ * Get the VM URL for the embedded EmbeddedActiveMQ server
+ *
+ * @return the VM URL for the embedded server
+ */
+ public String getVmURL() {
+ String vmURL = "vm://0";
+ for (TransportConfiguration transportConfiguration : configuration.getAcceptorConfigurations()) {
+ Map<String, Object> params = transportConfiguration.getParams();
+ if (params != null && params.containsKey(TransportConstants.SERVER_ID_PROP_NAME)) {
+ vmURL = "vm://" + params.get(TransportConstants.SERVER_ID_PROP_NAME);
+ }
+ }
+
+ return vmURL;
+ }
+
+ public long getMessageCount(String queueName) {
+ return getMessageCount(SimpleString.toSimpleString(queueName));
+ }
+
+ /**
+ * Get the number of messages in a specific queue.
+ *
+ * @param queueName the name of the queue
+ * @return the number of messages in the queue; -1 if queue is not found
+ */
+ public long getMessageCount(SimpleString queueName) {
+ Queue queue = locateQueue(queueName);
+ if (queue == null) {
+ log.warn("getMessageCount(queueName) - queue {} not found; returning -1", queueName.toString());
+ return -1;
+ }
+
+ return queue.getMessageCount();
+ }
+
+ public Queue locateQueue(String queueName) {
+ return locateQueue(SimpleString.toSimpleString(queueName));
+ }
+
+ public Queue locateQueue(SimpleString queueName) {
+ return server.getActiveMQServer().locateQueue(queueName);
+ }
+
+ public List<Queue> getBoundQueues(String address) {
+ return getBoundQueues(SimpleString.toSimpleString(address));
+ }
+
+ public List<Queue> getBoundQueues(SimpleString address) {
+ if (address == null) {
+ throw new IllegalArgumentException("getBoundQueues( address ) - address cannot be null");
+ }
+ List<Queue> boundQueues = new java.util.LinkedList<>();
+
+ BindingQueryResult bindingQueryResult = null;
+ try {
+ bindingQueryResult = server.getActiveMQServer().bindingQuery(address);
+ }
+ catch (Exception e) {
+ throw new EmbeddedActiveMQResourceException(String.format("getBoundQueues( %s ) - bindingQuery( %s ) failed", address.toString(), address.toString()));
+ }
+ if (bindingQueryResult.isExists()) {
+ for (SimpleString queueName : bindingQueryResult.getQueueNames()) {
+ boundQueues.add(server.getActiveMQServer().locateQueue(queueName));
+ }
+ }
+ return boundQueues;
+ }
+
+ public Queue createQueue(String name) {
+ return createQueue(SimpleString.toSimpleString(name), SimpleString.toSimpleString(name));
+ }
+
+ public Queue createQueue(String address, String name) {
+ return createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(name));
+ }
+
+ public Queue createQueue(SimpleString address, SimpleString name) {
+ SimpleString filter = null;
+ boolean temporary = false;
+ Queue queue = null;
+ try {
+ queue = server.getActiveMQServer().createQueue(address, name, filter, isUseDurableQueue(), temporary);
+ }
+ catch (Exception ex) {
+ throw new EmbeddedActiveMQResourceException(String.format("Failed to create queue: queueName = %s, name = %s", address.toString(), name.toString()), ex);
+ }
+
+ return queue;
+ }
+
+ public void createSharedQueue(String name, String user) {
+ createSharedQueue(SimpleString.toSimpleString(name), SimpleString.toSimpleString(name), SimpleString.toSimpleString(user));
+ }
+
+ public void createSharedQueue(String address, String name, String user) {
+ createSharedQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(name), SimpleString.toSimpleString(user));
+ }
+
+ public void createSharedQueue(SimpleString address, SimpleString name, SimpleString user) {
+ SimpleString filter = null;
+ try {
+ server.getActiveMQServer().createSharedQueue(address, name, filter, user, isUseDurableQueue());
+ }
+ catch (Exception ex) {
+ throw new EmbeddedActiveMQResourceException(String.format("Failed to create shared queue: queueName = %s, name = %s, user = %s", address.toString(), name.toString(), user.toString()), ex);
+ }
+ }
+
+ /**
+ * Create a ClientMessage
+ * <p>
+ * If useDurableMessage is false, a non-durable message is created. Otherwise, a durable message is created
+ *
+ * @return a new ClientMessage
+ */
+ public ClientMessage createMessage() {
+ getInternalClient();
+ return internalClient.createMessage(isUseDurableMessage());
+ }
+
+ /**
+ * Create a ClientMessage with the specified body
+ * <p>
+ * If useDurableMessage is false, a non-durable message is created. Otherwise, a durable message is created
+ *
+ * @param body the body for the new message
+ * @return a new ClientMessage with the specified body
+ */
+ public ClientMessage createMessage(byte[] body) {
+ getInternalClient();
+ ClientMessage message = internalClient.createMessage(isUseDurableMessage());
+
+ if (body != null) {
+ message.writeBodyBufferBytes(body);
+ }
+
+ return message;
+ }
+
+ /**
+ * Create a ClientMessage with the specified body
+ * <p>
+ * If useDurableMessage is false, a non-durable message is created. Otherwise, a durable message is created
+ *
+ * @param body the body for the new message
+ * @return a new ClientMessage with the specified body
+ */
+ public ClientMessage createMessage(String body) {
+ getInternalClient();
+ ClientMessage message = internalClient.createMessage(isUseDurableMessage());
+
+ if (body != null) {
+ message.writeBodyBufferString(body);
+ }
+
+ return message;
+ }
+
+ /**
+ * Create a ClientMessage with the specified message properties
+ * <p>
+ * If useDurableMessage is false, a non-durable message is created. Otherwise, a durable message is created
+ *
+ * @param properties message properties for the new message
+ * @return a new ClientMessage with the specified message properties
+ */
+ public ClientMessage createMessageWithProperties(Map<String, Object> properties) {
+ getInternalClient();
+ ClientMessage message = internalClient.createMessage(isUseDurableMessage());
+
+ addMessageProperties(message, properties);
+
+ return message;
+ }
+
+ /**
+ * Create a ClientMessage with the specified body and message properties
+ * <p>
+ * If useDurableMessage is false, a non-durable message is created. Otherwise, a durable message is created
+ *
+ * @param body the body for the new message
+ * @param properties message properties for the new message
+ * @return a new ClientMessage with the specified body and message properties
+ */
+ public ClientMessage createMessageWithProperties(byte[] body, Map<String, Object> properties) {
+ ClientMessage message = createMessage(body);
+
+ addMessageProperties(message, properties);
+
+ return message;
+ }
+
+ /**
+ * Create a ClientMessage with the specified body and message properties
+ * <p>
+ * If useDurableMessage is false, a non-durable message is created. Otherwise, a durable message is created
+ *
+ * @param body the body for the new message
+ * @param properties message properties for the new message
+ * @return a new ClientMessage with the specified body and message properties
+ */
+ public ClientMessage createMessageWithProperties(String body, Map<String, Object> properties) {
+ ClientMessage message = createMessage(body);
+
+ addMessageProperties(message, properties);
+
+ return message;
+ }
+
+ /**
+ * Send a message to an address
+ *
+ * @param address the target queueName for the message
+ * @param message the message to send
+ */
+ public void sendMessage(String address, ClientMessage message) {
+ sendMessage(SimpleString.toSimpleString(address), message);
+ }
+
+ /**
+ * Create a new message with the specified body, and send the message to an address
+ *
+ * @param address the target queueName for the message
+ * @param body the body for the new message
+ * @return the message that was sent
+ */
+ public ClientMessage sendMessage(String address, byte[] body) {
+ return sendMessage(SimpleString.toSimpleString(address), body);
+ }
+
+ /**
+ * Create a new message with the specified body, and send the message to an address
+ *
+ * @param address the target queueName for the message
+ * @param body the body for the new message
+ * @return the message that was sent
+ */
+ public ClientMessage sendMessage(String address, String body) {
+ return sendMessage(SimpleString.toSimpleString(address), body);
+ }
+
+ /**
+ * Create a new message with the specified properties, and send the message to an address
+ *
+ * @param address the target queueName for the message
+ * @param properties message properties for the new message
+ * @return the message that was sent
+ */
+ public ClientMessage sendMessageWithProperties(String address, Map<String, Object> properties) {
+ return sendMessageWithProperties(SimpleString.toSimpleString(address), properties);
+ }
+
+ /**
+ * Create a new message with the specified body and properties, and send the message to an address
+ *
+ * @param address the target queueName for the message
+ * @param body the body for the new message
+ * @param properties message properties for the new message
+ * @return the message that was sent
+ */
+ public ClientMessage sendMessageWithProperties(String address, byte[] body, Map<String, Object> properties) {
+ return sendMessageWithProperties(SimpleString.toSimpleString(address), body, properties);
+ }
+
+ /**
+ * Create a new message with the specified body and properties, and send the message to an address
+ *
+ * @param address the target queueName for the message
+ * @param body the body for the new message
+ * @param properties message properties for the new message
+ * @return the message that was sent
+ */
+ public ClientMessage sendMessageWithProperties(String address, String body, Map<String, Object> properties) {
+ return sendMessageWithProperties(SimpleString.toSimpleString(address), body, properties);
+ }
+
+ /**
+ * Send a message to an queueName
+ *
+ * @param address the target queueName for the message
+ * @param message the message to send
+ */
+ public void sendMessage(SimpleString address, ClientMessage message) {
+ if (address == null) {
+ throw new IllegalArgumentException("sendMessage failure - queueName is required");
+ }
+ else if (message == null) {
+ throw new IllegalArgumentException("sendMessage failure - a ClientMessage is required");
+ }
+
+ getInternalClient();
+ internalClient.sendMessage(address, message);
+ }
+
+ /**
+ * Create a new message with the specified body, and send the message to an queueName
+ *
+ * @param address the target queueName for the message
+ * @param body the body for the new message
+ * @return the message that was sent
+ */
+ public ClientMessage sendMessage(SimpleString address, byte[] body) {
+ ClientMessage message = createMessage(body);
+ sendMessage(address, message);
+ return message;
+ }
+
+ /**
+ * Create a new message with the specified body, and send the message to an queueName
+ *
+ * @param address the target queueName for the message
+ * @param body the body for the new message
+ * @return the message that was sent
+ */
+ public ClientMessage sendMessage(SimpleString address, String body) {
+ ClientMessage message = createMessage(body);
+ sendMessage(address, message);
+ return message;
+ }
+
+ /**
+ * Create a new message with the specified properties, and send the message to an queueName
+ *
+ * @param address the target queueName for the message
+ * @param properties message properties for the new message
+ * @return the message that was sent
+ */
+ public ClientMessage sendMessageWithProperties(SimpleString address, Map<String, Object> properties) {
+ ClientMessage message = createMessageWithProperties(properties);
+ sendMessage(address, message);
+ return message;
+ }
+
+ /**
+ * Create a new message with the specified body and properties, and send the message to an queueName
+ *
+ * @param address the target queueName for the message
+ * @param body the body for the new message
+ * @param properties message properties for the new message
+ * @return the message that was sent
+ */
+ public ClientMessage sendMessageWithProperties(SimpleString address, byte[] body, Map<String, Object> properties) {
+ ClientMessage message = createMessageWithProperties(body, properties);
+ sendMessage(address, message);
+ return message;
+ }
+
+ /**
+ * Create a new message with the specified body and properties, and send the message to an queueName
+ *
+ * @param address the target queueName for the message
+ * @param body the body for the new message
+ * @param properties message properties for the new message
+ * @return the message that was sent
+ */
+ public ClientMessage sendMessageWithProperties(SimpleString address, String body, Map<String, Object> properties) {
+
+ ClientMessage message = createMessageWithProperties(body, properties);
+ sendMessage(address, message);
+ return message;
+ }
+
+ /**
+ * Receive a message from the specified queue using the default receive timeout
+ *
+ * @param queueName name of the source queue
+ * @return the received ClientMessage, null if the receive timed-out
+ */
+ public ClientMessage receiveMessage(String queueName) {
+ return receiveMessage(SimpleString.toSimpleString(queueName));
+ }
+
+ /**
+ * Receive a message from the specified queue using the specified receive timeout
+ *
+ * @param queueName name of the source queue
+ * @param timeout receive timeout in milliseconds
+ * @return the received ClientMessage, null if the receive timed-out
+ */
+ public ClientMessage receiveMessage(String queueName, long timeout) {
+ return receiveMessage(SimpleString.toSimpleString(queueName), timeout);
+ }
+
+ /**
+ * Receive a message from the specified queue using the default receive timeout
+ *
+ * @param queueName name of the source queue
+ * @return the received ClientMessage, null if the receive timed-out
+ */
+ public ClientMessage receiveMessage(SimpleString queueName) {
+ final boolean browseOnly = false;
+ return getInternalClient().receiveMessage(queueName, defaultReceiveTimeout, browseOnly);
+ }
+
+ /**
+ * Receive a message from the specified queue using the specified receive timeout
+ *
+ * @param queueName name of the source queue
+ * @param timeout receive timeout in milliseconds
+ * @return the received ClientMessage, null if the receive timed-out
+ */
+ public ClientMessage receiveMessage(SimpleString queueName, long timeout) {
+ final boolean browseOnly = false;
+ return getInternalClient().receiveMessage(queueName, timeout, browseOnly);
+ }
+
+ /**
+ * Browse a message (receive but do not consume) from the specified queue using the default receive timeout
+ *
+ * @param queueName name of the source queue
+ * @return the received ClientMessage, null if the receive timed-out
+ */
+ public ClientMessage browseMessage(String queueName) {
+ return browseMessage(SimpleString.toSimpleString(queueName), defaultReceiveTimeout);
+ }
+
+ /**
+ * Browse a message (receive but do not consume) a message from the specified queue using the specified receive timeout
+ *
+ * @param queueName name of the source queue
+ * @param timeout receive timeout in milliseconds
+ * @return the received ClientMessage, null if the receive timed-out
+ */
+ public ClientMessage browseMessage(String queueName, long timeout) {
+ return browseMessage(SimpleString.toSimpleString(queueName), timeout);
+ }
+
+ /**
+ * Browse a message (receive but do not consume) from the specified queue using the default receive timeout
+ *
+ * @param queueName name of the source queue
+ * @return the received ClientMessage, null if the receive timed-out
+ */
+ public ClientMessage browseMessage(SimpleString queueName) {
+ final boolean browseOnly = true;
+ return getInternalClient().receiveMessage(queueName, defaultReceiveTimeout, browseOnly);
+ }
+
+ /**
+ * Browse a message (receive but do not consume) a message from the specified queue using the specified receive timeout
+ *
+ * @param queueName name of the source queue
+ * @param timeout receive timeout in milliseconds
+ * @return the received ClientMessage, null if the receive timed-out
+ */
+ public ClientMessage browseMessage(SimpleString queueName, long timeout) {
+ final boolean browseOnly = true;
+ return getInternalClient().receiveMessage(queueName, timeout, browseOnly);
+ }
+
+ private InternalClient getInternalClient() {
+ if (internalClient == null) {
+ log.info("Creating Internal Client");
+ internalClient = new InternalClient();
+ internalClient.start();
+ }
+
+ return internalClient;
+ }
+
+ public static class EmbeddedActiveMQResourceException extends RuntimeException {
+
+ public EmbeddedActiveMQResourceException(String message) {
+ super(message);
+ }
+
+ public EmbeddedActiveMQResourceException(String message, Exception cause) {
+ super(message, cause);
+ }
+ }
+
+ private class InternalClient {
+
+ ServerLocator serverLocator;
+ ClientSessionFactory sessionFactory;
+ ClientSession session;
+ ClientProducer producer;
+
+ InternalClient() {
+ }
+
+ void start() {
+ log.info("Starting {}", this.getClass().getSimpleName());
+ try {
+ serverLocator = ActiveMQClient.createServerLocator(getVmURL());
+ sessionFactory = serverLocator.createSessionFactory();
+ }
+ catch (RuntimeException runtimeEx) {
+ throw runtimeEx;
+ }
+ catch (Exception ex) {
+ throw new EmbeddedActiveMQResourceException("Internal Client creation failure", ex);
+ }
+
+ try {
+ session = sessionFactory.createSession();
+ producer = session.createProducer((String) null);
+ session.start();
+ }
+ catch (ActiveMQException amqEx) {
+ throw new EmbeddedActiveMQResourceException("Internal Client creation failure", amqEx);
+ }
+ }
+
+ void stop() {
+ if (producer != null) {
+ try {
+ producer.close();
+ }
+ catch (ActiveMQException amqEx) {
+ log.warn("ActiveMQException encountered closing InternalClient ClientProducer - ignoring", amqEx);
+ }
+ finally {
+ producer = null;
+ }
+ }
+ if (session != null) {
+ try {
+ session.close();
+ }
+ catch (ActiveMQException amqEx) {
+ log.warn("ActiveMQException encountered closing InternalClient ClientSession - ignoring", amqEx);
+ }
+ finally {
+ session = null;
+ }
+ }
+ if (sessionFactory != null) {
+ sessionFactory.close();
+ sessionFactory = null;
+ }
+ if (serverLocator != null) {
+ serverLocator.close();
+ serverLocator = null;
+ }
+
+ }
+
+ public ClientMessage createMessage(boolean durable) {
+ checkSession();
+
+ return session.createMessage(durable);
+ }
+
+ public void sendMessage(SimpleString address, ClientMessage message) {
+ checkSession();
+ if (producer == null) {
+ throw new IllegalStateException("ClientProducer is null - has the InternalClient been started?");
+ }
+
+ try {
+ producer.send(address, message);
+ }
+ catch (ActiveMQException amqEx) {
+ throw new EmbeddedActiveMQResourceException(String.format("Failed to send message to %s", address.toString()), amqEx);
+ }
+ }
+
+ public ClientMessage receiveMessage(SimpleString address, long timeout, boolean browseOnly) {
+ checkSession();
+
+ ClientConsumer consumer = null;
+ try {
+ consumer = session.createConsumer(address, browseOnly);
+ }
+ catch (ActiveMQException amqEx) {
+ throw new EmbeddedActiveMQResourceException(String.format("Failed to create consumer for %s", address.toString()), amqEx);
+ }
+
+ ClientMessage message = null;
+ if (timeout > 0) {
+ try {
+ message = consumer.receive(timeout);
+ }
+ catch (ActiveMQException amqEx) {
+ throw new EmbeddedActiveMQResourceException(String.format("ClientConsumer.receive( timeout = %d ) for %s failed", timeout, address.toString()), amqEx);
+ }
+ }
+ else if (timeout == 0) {
+ try {
+ message = consumer.receiveImmediate();
+ }
+ catch (ActiveMQException amqEx) {
+ throw new EmbeddedActiveMQResourceException(String.format("ClientConsumer.receiveImmediate() for %s failed", address.toString()), amqEx);
+ }
+ }
+ else {
+ try {
+ message = consumer.receive();
+ }
+ catch (ActiveMQException amqEx) {
+ throw new EmbeddedActiveMQResourceException(String.format("ClientConsumer.receive() for %s failed", address.toString()), amqEx);
+ }
+ }
+
+ return message;
+ }
+
+ void checkSession() {
+ getInternalClient();
+ if (session == null) {
+ throw new IllegalStateException("ClientSession is null - has the InternalClient been started?");
+ }
+ }
+ }
+}