You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/08/05 02:33:23 UTC
git commit: https://issues.apache.org/jira/browse/AMQ-5306
Repository: activemq
Updated Branches:
refs/heads/trunk 7c04ead46 -> 6dd47bb63
https://issues.apache.org/jira/browse/AMQ-5306
This closes #39
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6dd47bb6
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6dd47bb6
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6dd47bb6
Branch: refs/heads/trunk
Commit: 6dd47bb63fb37a4d1ff491188e75f613b6b81935
Parents: 7c04ead
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Aug 4 20:33:16 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Aug 4 20:33:16 2014 -0400
----------------------------------------------------------------------
.../apache/activemq/tool/AbstractJmsClient.java | 119 ++++++++++---
.../apache/activemq/tool/JmsConsumerClient.java | 18 +-
.../apache/activemq/tool/JmsProducerClient.java | 4 +-
.../activemq/tool/AbstractJmsClientTest.java | 175 +++++++++++++++++++
4 files changed, 282 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/6dd47bb6/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClient.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClient.java b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClient.java
index 07e7c2f..d2e38ab 100644
--- a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClient.java
+++ b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClient.java
@@ -16,12 +16,16 @@
*/
package org.apache.activemq.tool;
+import java.util.ArrayList;
+import java.util.List;
+
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.tool.properties.JmsClientProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,6 +34,10 @@ public abstract class AbstractJmsClient {
private static final Logger LOG = LoggerFactory.getLogger(AbstractJmsClient.class);
+ private static final String QUEUE_SCHEME = "queue://";
+ private static final String TOPIC_SCHEME = "topic://";
+ public static final String DESTINATION_SEPARATOR = ",";
+
protected ConnectionFactory factory;
protected Connection jmsConnection;
protected Session jmsSession;
@@ -108,18 +116,52 @@ public abstract class AbstractJmsClient {
return jmsSession;
}
- public Destination[] createDestination(int destIndex, int destCount) throws JMSException {
+ public Destination[] createDestinations(int destCount) throws JMSException {
+ final String destName = getClient().getDestName();
+ ArrayList<Destination> destinations = new ArrayList<>();
+ if (destName.contains(DESTINATION_SEPARATOR)) {
+ if (getClient().isDestComposite() && (destCount == 1)) {
+ // user was explicit about which destinations to make composite
+ String[] simpleNames = mapToSimpleNames(destName.split(DESTINATION_SEPARATOR));
+ String joinedSimpleNames = join(simpleNames, DESTINATION_SEPARATOR);
- if (getClient().isDestComposite()) {
- return new Destination[] {
- createCompositeDestination(getClient().getDestName(), destIndex, destCount)
- };
+ // use the type of the 1st destination for the Destination instance
+ byte destinationType = getDestinationType(destName);
+ destinations.add(createCompositeDestination(destinationType, joinedSimpleNames, 1));
+ } else {
+ LOG.info("User requested multiple destinations, splitting: {}", destName);
+ // either composite with multiple destinations to be suffixed
+ // or multiple non-composite destinations
+ String[] destinationNames = destName.split(DESTINATION_SEPARATOR);
+ for (String splitDestName : destinationNames) {
+ addDestinations(destinations, splitDestName, destCount);
+ }
+ }
+ } else {
+ addDestinations(destinations, destName, destCount);
+ }
+ return destinations.toArray(new Destination[] {});
+ }
+
+ private String join(String[] stings, String separator) {
+ StringBuffer sb = new StringBuffer();
+ for (int i = 0; i < stings.length; i++) {
+ if (i > 0) {
+ sb.append(separator);
+ }
+ sb.append(stings[i]);
+ }
+ return sb.toString();
+ }
+
+ private void addDestinations(List<Destination> destinations, String destName, int destCount) throws JMSException {
+ boolean destComposite = getClient().isDestComposite();
+ if ((destComposite) && (destCount > 1)) {
+ destinations.add(createCompositeDestination(destName, destCount));
} else {
- Destination[] dest = new Destination[destCount];
for (int i = 0; i < destCount; i++) {
- dest[i] = createDestination(withDestinationSuffix(getClient().getDestName(), i, destCount));
+ destinations.add(createDestination(withDestinationSuffix(destName, i, destCount)));
}
- return dest;
}
}
@@ -127,20 +169,12 @@ public abstract class AbstractJmsClient {
return (destCount == 1) ? name : name + "." + destIndex;
}
- public Destination createCompositeDestination(int destIndex, int destCount) throws JMSException {
- return createCompositeDestination(getClient().getDestName(), destIndex, destCount);
+ protected Destination createCompositeDestination(String destName, int destCount) throws JMSException {
+ return createCompositeDestination(getDestinationType(destName), destName, destCount);
}
- protected Destination createCompositeDestination(String name, int destIndex, int destCount) throws JMSException {
- String simpleName;
-
- if (name.startsWith("queue://")) {
- simpleName = name.substring("queue://".length());
- } else if (name.startsWith("topic://")) {
- simpleName = name.substring("topic://".length());
- } else {
- simpleName = name;
- }
+ protected Destination createCompositeDestination(byte destinationType, String destName, int destCount) throws JMSException {
+ String simpleName = getSimpleName(destName);
String compDestName = "";
for (int i = 0; i < destCount; i++) {
@@ -150,16 +184,47 @@ public abstract class AbstractJmsClient {
compDestName += withDestinationSuffix(simpleName, i, destCount);
}
- return createDestination(compDestName);
+ LOG.info("Creating composite destination: {}", compDestName);
+ return (destinationType == ActiveMQDestination.TOPIC_TYPE) ?
+ getSession().createTopic(compDestName) : getSession().createQueue(compDestName);
+ }
+
+ private String[] mapToSimpleNames(String[] destNames) {
+ assert (destNames != null);
+ String[] simpleNames = new String[destNames.length];
+ for (int i = 0; i < destNames.length; i++) {
+ simpleNames[i] = getSimpleName(destNames[i]);
+ }
+ return simpleNames;
+ }
+
+ private String getSimpleName(String destName) {
+ String simpleName;
+ if (destName.startsWith(QUEUE_SCHEME)) {
+ simpleName = destName.substring(QUEUE_SCHEME.length());
+ } else if (destName.startsWith(TOPIC_SCHEME)) {
+ simpleName = destName.substring(TOPIC_SCHEME.length());
+ } else {
+ simpleName = destName;
+ }
+ return simpleName;
+ }
+
+ private byte getDestinationType(String destName) {
+ assert (destName != null);
+ if (destName.startsWith(QUEUE_SCHEME)) {
+ return ActiveMQDestination.QUEUE_TYPE;
+ } else {
+ return ActiveMQDestination.TOPIC_TYPE;
+ }
}
- protected Destination createDestination(String name) throws JMSException {
- if (name.startsWith("queue://")) {
- return getSession().createQueue(name.substring("queue://".length()));
- } else if (name.startsWith("topic://")) {
- return getSession().createTopic(name.substring("topic://".length()));
+ protected Destination createDestination(String destName) throws JMSException {
+ String simpleName = getSimpleName(destName);
+ if (getDestinationType(destName) == ActiveMQDestination.QUEUE_TYPE) {
+ return getSession().createQueue(simpleName);
} else {
- return getSession().createTopic(name);
+ return getSession().createTopic(simpleName);
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/6dd47bb6/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
index 7351d02..d660883 100644
--- a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
+++ b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
@@ -26,6 +26,7 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Topic;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.tool.properties.JmsClientProperties;
import org.apache.activemq.tool.properties.JmsConsumerProperties;
import org.slf4j.Logger;
@@ -208,12 +209,19 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
}
public MessageConsumer createJmsConsumer() throws JMSException {
- Destination[] dest = createDestination(destIndex, destCount);
+ Destination[] dest = createDestinations(destCount);
- if (this.client.getMessageSelector() == null)
- return createJmsConsumer(dest[0]);
- else
- return createJmsConsumer(dest[0], this.client.getMessageSelector(), false);
+ Destination consumedDestination = dest[0];
+ if (dest.length > 1) {
+ String destinationName = ((ActiveMQDestination) consumedDestination).getPhysicalName();
+ LOG.warn("Multiple destinations requested for consumer; using only first: {}", destinationName);
+ }
+
+ if (this.client.getMessageSelector() == null) {
+ return createJmsConsumer(consumedDestination);
+ } else {
+ return createJmsConsumer(consumedDestination, this.client.getMessageSelector(), false);
+ }
}
public MessageConsumer createJmsConsumer(Destination dest) throws JMSException {
http://git-wip-us.apache.org/repos/asf/activemq/blob/6dd47bb6/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java
index e59a857..510d92d 100644
--- a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java
+++ b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java
@@ -77,7 +77,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
public void sendCountBasedMessages(long messageCount) throws JMSException {
// Parse through different ways to send messages
// Avoided putting the condition inside the loop to prevent effect on performance
- Destination[] dest = createDestination(destIndex, destCount);
+ Destination[] dest = createDestinations(destCount);
// Create a producer, if none is created.
if (getJmsProducer() == null) {
@@ -165,7 +165,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
// Parse through different ways to send messages
// Avoided putting the condition inside the loop to prevent effect on performance
- Destination[] dest = createDestination(destIndex, destCount);
+ Destination[] dest = createDestinations(destCount);
// Create a producer, if none is created.
if (getJmsProducer() == null) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/6dd47bb6/activemq-tooling/activemq-perf-maven-plugin/src/test/java/org/apache/activemq/tool/AbstractJmsClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/test/java/org/apache/activemq/tool/AbstractJmsClientTest.java b/activemq-tooling/activemq-perf-maven-plugin/src/test/java/org/apache/activemq/tool/AbstractJmsClientTest.java
new file mode 100644
index 0000000..5e8f792
--- /dev/null
+++ b/activemq-tooling/activemq-perf-maven-plugin/src/test/java/org/apache/activemq/tool/AbstractJmsClientTest.java
@@ -0,0 +1,175 @@
+package org.apache.activemq.tool;
+
+import static org.apache.activemq.command.ActiveMQDestination.QUEUE_TYPE;
+import static org.apache.activemq.command.ActiveMQDestination.TOPIC_TYPE;
+import static org.junit.Assert.assertEquals;
+
+import java.net.URI;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.tool.properties.JmsClientProperties;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class AbstractJmsClientTest {
+
+ public class NullJmsClient extends AbstractJmsClient {
+ private JmsClientProperties client;
+
+ public NullJmsClient(ConnectionFactory factory) {
+ super(factory);
+ }
+
+ @Override
+ public JmsClientProperties getClient() {
+ return client;
+ }
+
+ @Override
+ public void setClient(JmsClientProperties client) {
+ this.client = client;
+ }
+ }
+
+ private final String DEFAULT_DEST = "TEST.FOO";
+ private static BrokerService brokerService;
+ private static ActiveMQConnectionFactory connectionFactory;
+
+ private AbstractJmsClient jmsClient;
+ private JmsClientProperties clientProperties;
+
+ @BeforeClass
+ public static void setUpBrokerAndConnectionFactory() throws Exception {
+ brokerService = BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false"));
+ brokerService.start();
+ connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
+ }
+
+ @AfterClass
+ public static void tearDownBroker() throws Exception {
+ brokerService.stop();
+ }
+
+ @Before
+ public void setUp() {
+ jmsClient = new NullJmsClient(connectionFactory);
+ clientProperties = new JmsClientProperties();
+ clientProperties.setDestName(DEFAULT_DEST);
+ jmsClient.setClient(clientProperties);
+ }
+
+ @Test
+ public void testCreateDestination() throws JMSException {
+ assertDestinationNameType("dest", TOPIC_TYPE,
+ asAmqDest(jmsClient.createDestination("dest")));
+ }
+
+ @Test
+ public void testCreateDestination_topic() throws JMSException {
+ assertDestinationNameType("dest", TOPIC_TYPE,
+ asAmqDest(jmsClient.createDestination("topic://dest")));
+ }
+
+ @Test
+ public void testCreateDestination_queue() throws JMSException {
+ assertDestinationNameType("dest", QUEUE_TYPE,
+ asAmqDest(jmsClient.createDestination("queue://dest")));
+ }
+
+ @Test
+ public void testCreateDestinations_commaSeparated() throws JMSException {
+ clientProperties.setDestName("queue://foo,topic://cheese");
+ Destination[] destinations = jmsClient.createDestinations(1);
+ assertEquals(2, destinations.length);
+ assertDestinationNameType("foo", QUEUE_TYPE, asAmqDest(destinations[0]));
+ assertDestinationNameType("cheese", TOPIC_TYPE, asAmqDest(destinations[1]));
+ }
+
+ @Test
+ public void testCreateDestinations_multipleComposite() throws JMSException {
+ clientProperties.setDestComposite(true);
+ clientProperties.setDestName("queue://foo,queue://cheese");
+ Destination[] destinations = jmsClient.createDestinations(1);
+ assertEquals(1, destinations.length);
+ // suffixes should be added
+ assertDestinationNameType("foo,cheese", QUEUE_TYPE, asAmqDest(destinations[0]));
+ }
+
+ @Test
+ public void testCreateDestinations() throws JMSException {
+ Destination[] destinations = jmsClient.createDestinations(1);
+ assertEquals(1, destinations.length);
+ assertDestinationNameType(DEFAULT_DEST, TOPIC_TYPE, asAmqDest(destinations[0]));
+ }
+
+ @Test
+ public void testCreateDestinations_multiple() throws JMSException {
+ Destination[] destinations = jmsClient.createDestinations(2);
+ assertEquals(2, destinations.length);
+ // suffixes should be added
+ assertDestinationNameType(DEFAULT_DEST + ".0", TOPIC_TYPE, asAmqDest(destinations[0]));
+ assertDestinationNameType(DEFAULT_DEST + ".1", TOPIC_TYPE, asAmqDest(destinations[1]));
+ }
+
+ @Test
+ public void testCreateDestinations_multipleCommaSeparated() throws JMSException {
+ clientProperties.setDestName("queue://foo,topic://cheese");
+ Destination[] destinations = jmsClient.createDestinations(2);
+ assertEquals(4, destinations.length);
+ // suffixes should be added
+ assertDestinationNameType("foo.0", QUEUE_TYPE, asAmqDest(destinations[0]));
+ assertDestinationNameType("foo.1", QUEUE_TYPE, asAmqDest(destinations[1]));
+ assertDestinationNameType("cheese.0", TOPIC_TYPE, asAmqDest(destinations[2]));
+ assertDestinationNameType("cheese.1", TOPIC_TYPE, asAmqDest(destinations[3]));
+ }
+
+ @Test
+ public void testCreateDestinations_composite() throws JMSException {
+ clientProperties.setDestComposite(true);
+ Destination[] destinations = jmsClient.createDestinations(2);
+ assertEquals(1, destinations.length);
+ // suffixes should be added
+ String expectedDestName = DEFAULT_DEST + ".0," + DEFAULT_DEST + ".1";
+ assertDestinationNameType(expectedDestName, TOPIC_TYPE, asAmqDest(destinations[0]));
+ }
+
+ @Test
+ public void testCreateDestinations_compositeQueue() throws JMSException {
+ clientProperties.setDestComposite(true);
+ clientProperties.setDestName("queue://" + DEFAULT_DEST);
+ Destination[] destinations = jmsClient.createDestinations(2);
+ assertEquals(1, destinations.length);
+ // suffixes should be added
+ String expectedDestName = DEFAULT_DEST + ".0," + DEFAULT_DEST + ".1";
+ assertDestinationNameType(expectedDestName, QUEUE_TYPE, asAmqDest(destinations[0]));
+ }
+
+ @Test
+ public void testCreateDestinations_compositeCommaSeparated() throws JMSException {
+ clientProperties.setDestComposite(true);
+ clientProperties.setDestName("queue://foo,topic://cheese");
+ Destination[] destinations = jmsClient.createDestinations(2);
+ assertEquals(2, destinations.length);
+
+ assertDestinationNameType("foo.0,foo.1", QUEUE_TYPE, asAmqDest(destinations[0]));
+ assertDestinationNameType("cheese.0,cheese.1", TOPIC_TYPE, asAmqDest(destinations[1]));
+ }
+
+ private void assertDestinationNameType(String physicalName, byte destinationType, ActiveMQDestination destination) {
+ assertEquals(destinationType, destination.getDestinationType());
+ assertEquals(physicalName, destination.getPhysicalName());
+ }
+
+ private ActiveMQDestination asAmqDest(Destination destination) {
+ return (ActiveMQDestination) destination;
+ }
+}