You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/08/05 02:33:23 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-5306

Repository: activemq
Updated Branches:
  refs/heads/trunk 7c04ead46 -> 6dd47bb63


https://issues.apache.org/jira/browse/AMQ-5306

 This closes #39

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6dd47bb6
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6dd47bb6
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6dd47bb6

Branch: refs/heads/trunk
Commit: 6dd47bb63fb37a4d1ff491188e75f613b6b81935
Parents: 7c04ead
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Aug 4 20:33:16 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Aug 4 20:33:16 2014 -0400

----------------------------------------------------------------------
 .../apache/activemq/tool/AbstractJmsClient.java | 119 ++++++++++---
 .../apache/activemq/tool/JmsConsumerClient.java |  18 +-
 .../apache/activemq/tool/JmsProducerClient.java |   4 +-
 .../activemq/tool/AbstractJmsClientTest.java    | 175 +++++++++++++++++++
 4 files changed, 282 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/6dd47bb6/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClient.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClient.java b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClient.java
index 07e7c2f..d2e38ab 100644
--- a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClient.java
+++ b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClient.java
@@ -16,12 +16,16 @@
  */
 package org.apache.activemq.tool;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Session;
 
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.tool.properties.JmsClientProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,6 +34,10 @@ public abstract class AbstractJmsClient {
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractJmsClient.class);
 
+    private static final String QUEUE_SCHEME = "queue://";
+    private static final String TOPIC_SCHEME = "topic://";
+    public static final String DESTINATION_SEPARATOR = ",";
+
     protected ConnectionFactory factory;
     protected Connection jmsConnection;
     protected Session jmsSession;
@@ -108,18 +116,52 @@ public abstract class AbstractJmsClient {
         return jmsSession;
     }
 
-    public Destination[] createDestination(int destIndex, int destCount) throws JMSException {
+    public Destination[] createDestinations(int destCount) throws JMSException {
+        final String destName = getClient().getDestName();
+        ArrayList<Destination> destinations = new ArrayList<>();
+        if (destName.contains(DESTINATION_SEPARATOR)) {
+            if (getClient().isDestComposite() && (destCount == 1)) {
+                // user was explicit about which destinations to make composite
+                String[] simpleNames = mapToSimpleNames(destName.split(DESTINATION_SEPARATOR));
+                String joinedSimpleNames = join(simpleNames, DESTINATION_SEPARATOR);
 
-        if (getClient().isDestComposite()) {
-            return new Destination[] {
-                createCompositeDestination(getClient().getDestName(), destIndex, destCount)
-            };
+                // use the type of the 1st destination for the Destination instance
+                byte destinationType = getDestinationType(destName);
+                destinations.add(createCompositeDestination(destinationType, joinedSimpleNames, 1));
+            } else {
+                LOG.info("User requested multiple destinations, splitting: {}", destName);
+                // either composite with multiple destinations to be suffixed
+                // or multiple non-composite destinations
+                String[] destinationNames = destName.split(DESTINATION_SEPARATOR);
+                for (String splitDestName : destinationNames) {
+                    addDestinations(destinations, splitDestName, destCount);
+                }
+            }
+        } else {
+            addDestinations(destinations, destName, destCount);
+        }
+        return destinations.toArray(new Destination[] {});
+    }
+
+    private String join(String[] stings, String separator) {
+        StringBuffer sb = new StringBuffer();
+        for (int i = 0; i < stings.length; i++) {
+            if (i > 0) {
+                sb.append(separator);
+            }
+            sb.append(stings[i]);
+        }
+        return sb.toString();
+    }
+
+    private void addDestinations(List<Destination> destinations, String destName, int destCount) throws JMSException {
+        boolean destComposite = getClient().isDestComposite();
+        if ((destComposite) && (destCount > 1)) {
+            destinations.add(createCompositeDestination(destName, destCount));
         } else {
-            Destination[] dest = new Destination[destCount];
             for (int i = 0; i < destCount; i++) {
-                dest[i] = createDestination(withDestinationSuffix(getClient().getDestName(), i, destCount));
+                destinations.add(createDestination(withDestinationSuffix(destName, i, destCount)));
             }
-            return dest;
         }
     }
 
@@ -127,20 +169,12 @@ public abstract class AbstractJmsClient {
         return (destCount == 1) ? name : name + "." + destIndex;
     }
 
-    public Destination createCompositeDestination(int destIndex, int destCount) throws JMSException {
-        return createCompositeDestination(getClient().getDestName(), destIndex, destCount);
+    protected Destination createCompositeDestination(String destName, int destCount) throws JMSException {
+        return createCompositeDestination(getDestinationType(destName), destName, destCount);
     }
 
-    protected Destination createCompositeDestination(String name, int destIndex, int destCount) throws JMSException {
-        String simpleName;
-
-        if (name.startsWith("queue://")) {
-            simpleName = name.substring("queue://".length());
-        } else if (name.startsWith("topic://")) {
-            simpleName = name.substring("topic://".length());
-        } else {
-            simpleName = name;
-        }
+    protected Destination createCompositeDestination(byte destinationType, String destName, int destCount) throws JMSException {
+        String simpleName = getSimpleName(destName);
 
         String compDestName = "";
         for (int i = 0; i < destCount; i++) {
@@ -150,16 +184,47 @@ public abstract class AbstractJmsClient {
             compDestName += withDestinationSuffix(simpleName, i, destCount);
         }
 
-        return createDestination(compDestName);
+        LOG.info("Creating composite destination: {}", compDestName);
+        return (destinationType == ActiveMQDestination.TOPIC_TYPE) ?
+            getSession().createTopic(compDestName) : getSession().createQueue(compDestName);
+    }
+
+    private String[] mapToSimpleNames(String[] destNames) {
+        assert (destNames != null);
+        String[] simpleNames = new String[destNames.length];
+        for (int i = 0; i < destNames.length; i++) {
+            simpleNames[i] = getSimpleName(destNames[i]);
+        }
+        return simpleNames;
+    }
+
+    private String getSimpleName(String destName) {
+        String simpleName;
+        if (destName.startsWith(QUEUE_SCHEME)) {
+            simpleName = destName.substring(QUEUE_SCHEME.length());
+        } else if (destName.startsWith(TOPIC_SCHEME)) {
+            simpleName = destName.substring(TOPIC_SCHEME.length());
+        } else {
+            simpleName = destName;
+        }
+        return simpleName;
+    }
+
+    private byte getDestinationType(String destName) {
+        assert (destName != null);
+        if (destName.startsWith(QUEUE_SCHEME)) {
+            return ActiveMQDestination.QUEUE_TYPE;
+        } else {
+            return ActiveMQDestination.TOPIC_TYPE;
+        }
     }
 
-    protected Destination createDestination(String name) throws JMSException {
-        if (name.startsWith("queue://")) {
-            return getSession().createQueue(name.substring("queue://".length()));
-        } else if (name.startsWith("topic://")) {
-            return getSession().createTopic(name.substring("topic://".length()));
+    protected Destination createDestination(String destName) throws JMSException {
+        String simpleName = getSimpleName(destName);
+        if (getDestinationType(destName) == ActiveMQDestination.QUEUE_TYPE) {
+            return getSession().createQueue(simpleName);
         } else {
-            return getSession().createTopic(name);
+            return getSession().createTopic(simpleName);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/6dd47bb6/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
index 7351d02..d660883 100644
--- a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
+++ b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java
@@ -26,6 +26,7 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.Topic;
 
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.tool.properties.JmsClientProperties;
 import org.apache.activemq.tool.properties.JmsConsumerProperties;
 import org.slf4j.Logger;
@@ -208,12 +209,19 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
     }
 
     public MessageConsumer createJmsConsumer() throws JMSException {
-        Destination[] dest = createDestination(destIndex, destCount);
+        Destination[] dest = createDestinations(destCount);
 
-        if (this.client.getMessageSelector() == null)
-            return createJmsConsumer(dest[0]);
-        else
-            return createJmsConsumer(dest[0], this.client.getMessageSelector(), false);
+        Destination consumedDestination = dest[0];
+        if (dest.length > 1) {
+            String destinationName = ((ActiveMQDestination) consumedDestination).getPhysicalName();
+            LOG.warn("Multiple destinations requested for consumer; using only first: {}", destinationName);
+        }
+
+        if (this.client.getMessageSelector() == null) {
+            return createJmsConsumer(consumedDestination);
+        } else {
+            return createJmsConsumer(consumedDestination, this.client.getMessageSelector(), false);
+        }
     }
 
     public MessageConsumer createJmsConsumer(Destination dest) throws JMSException {

http://git-wip-us.apache.org/repos/asf/activemq/blob/6dd47bb6/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java
index e59a857..510d92d 100644
--- a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java
+++ b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java
@@ -77,7 +77,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
     public void sendCountBasedMessages(long messageCount) throws JMSException {
         // Parse through different ways to send messages
         // Avoided putting the condition inside the loop to prevent effect on performance
-        Destination[] dest = createDestination(destIndex, destCount);
+        Destination[] dest = createDestinations(destCount);
 
         // Create a producer, if none is created.
         if (getJmsProducer() == null) {
@@ -165,7 +165,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
         // Parse through different ways to send messages
         // Avoided putting the condition inside the loop to prevent effect on performance
 
-        Destination[] dest = createDestination(destIndex, destCount);
+        Destination[] dest = createDestinations(destCount);
 
         // Create a producer, if none is created.
         if (getJmsProducer() == null) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/6dd47bb6/activemq-tooling/activemq-perf-maven-plugin/src/test/java/org/apache/activemq/tool/AbstractJmsClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/test/java/org/apache/activemq/tool/AbstractJmsClientTest.java b/activemq-tooling/activemq-perf-maven-plugin/src/test/java/org/apache/activemq/tool/AbstractJmsClientTest.java
new file mode 100644
index 0000000..5e8f792
--- /dev/null
+++ b/activemq-tooling/activemq-perf-maven-plugin/src/test/java/org/apache/activemq/tool/AbstractJmsClientTest.java
@@ -0,0 +1,175 @@
+package org.apache.activemq.tool;
+
+import static org.apache.activemq.command.ActiveMQDestination.QUEUE_TYPE;
+import static org.apache.activemq.command.ActiveMQDestination.TOPIC_TYPE;
+import static org.junit.Assert.assertEquals;
+
+import java.net.URI;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.tool.properties.JmsClientProperties;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class AbstractJmsClientTest {
+
+    public class NullJmsClient extends AbstractJmsClient {
+        private JmsClientProperties client;
+
+        public NullJmsClient(ConnectionFactory factory) {
+            super(factory);
+        }
+
+        @Override
+        public JmsClientProperties getClient() {
+            return client;
+        }
+
+        @Override
+        public void setClient(JmsClientProperties client) {
+            this.client = client;
+        }
+    }
+
+    private final String DEFAULT_DEST = "TEST.FOO";
+    private static BrokerService brokerService;
+    private static ActiveMQConnectionFactory connectionFactory;
+
+    private AbstractJmsClient jmsClient;
+    private JmsClientProperties clientProperties;
+
+    @BeforeClass
+    public static void setUpBrokerAndConnectionFactory() throws Exception {
+        brokerService = BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false"));
+        brokerService.start();
+        connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
+    }
+
+    @AfterClass
+    public static void tearDownBroker() throws Exception {
+        brokerService.stop();
+    }
+
+    @Before
+    public void setUp() {
+        jmsClient = new NullJmsClient(connectionFactory);
+        clientProperties = new JmsClientProperties();
+        clientProperties.setDestName(DEFAULT_DEST);
+        jmsClient.setClient(clientProperties);
+    }
+
+    @Test
+    public void testCreateDestination() throws JMSException {
+        assertDestinationNameType("dest", TOPIC_TYPE,
+                asAmqDest(jmsClient.createDestination("dest")));
+    }
+
+    @Test
+    public void testCreateDestination_topic() throws JMSException {
+        assertDestinationNameType("dest", TOPIC_TYPE,
+                asAmqDest(jmsClient.createDestination("topic://dest")));
+    }
+
+    @Test
+    public void testCreateDestination_queue() throws JMSException {
+        assertDestinationNameType("dest", QUEUE_TYPE,
+                asAmqDest(jmsClient.createDestination("queue://dest")));
+    }
+
+    @Test
+    public void testCreateDestinations_commaSeparated() throws JMSException {
+        clientProperties.setDestName("queue://foo,topic://cheese");
+        Destination[] destinations = jmsClient.createDestinations(1);
+        assertEquals(2, destinations.length);
+        assertDestinationNameType("foo", QUEUE_TYPE, asAmqDest(destinations[0]));
+        assertDestinationNameType("cheese", TOPIC_TYPE, asAmqDest(destinations[1]));
+    }
+
+    @Test
+    public void testCreateDestinations_multipleComposite() throws JMSException {
+        clientProperties.setDestComposite(true);
+        clientProperties.setDestName("queue://foo,queue://cheese");
+        Destination[] destinations = jmsClient.createDestinations(1);
+        assertEquals(1, destinations.length);
+        // suffixes should be added
+        assertDestinationNameType("foo,cheese", QUEUE_TYPE, asAmqDest(destinations[0]));
+    }
+
+    @Test
+    public void testCreateDestinations() throws JMSException {
+        Destination[] destinations = jmsClient.createDestinations(1);
+        assertEquals(1, destinations.length);
+        assertDestinationNameType(DEFAULT_DEST, TOPIC_TYPE, asAmqDest(destinations[0]));
+    }
+
+    @Test
+    public void testCreateDestinations_multiple() throws JMSException {
+        Destination[] destinations = jmsClient.createDestinations(2);
+        assertEquals(2, destinations.length);
+        // suffixes should be added
+        assertDestinationNameType(DEFAULT_DEST + ".0", TOPIC_TYPE, asAmqDest(destinations[0]));
+        assertDestinationNameType(DEFAULT_DEST + ".1", TOPIC_TYPE, asAmqDest(destinations[1]));
+    }
+
+    @Test
+    public void testCreateDestinations_multipleCommaSeparated() throws JMSException {
+        clientProperties.setDestName("queue://foo,topic://cheese");
+        Destination[] destinations = jmsClient.createDestinations(2);
+        assertEquals(4, destinations.length);
+        // suffixes should be added
+        assertDestinationNameType("foo.0", QUEUE_TYPE, asAmqDest(destinations[0]));
+        assertDestinationNameType("foo.1", QUEUE_TYPE, asAmqDest(destinations[1]));
+        assertDestinationNameType("cheese.0", TOPIC_TYPE, asAmqDest(destinations[2]));
+        assertDestinationNameType("cheese.1", TOPIC_TYPE, asAmqDest(destinations[3]));
+    }
+
+    @Test
+    public void testCreateDestinations_composite() throws JMSException {
+        clientProperties.setDestComposite(true);
+        Destination[] destinations = jmsClient.createDestinations(2);
+        assertEquals(1, destinations.length);
+        // suffixes should be added
+        String expectedDestName = DEFAULT_DEST + ".0," + DEFAULT_DEST + ".1";
+        assertDestinationNameType(expectedDestName, TOPIC_TYPE, asAmqDest(destinations[0]));
+    }
+
+    @Test
+    public void testCreateDestinations_compositeQueue() throws JMSException {
+        clientProperties.setDestComposite(true);
+        clientProperties.setDestName("queue://" + DEFAULT_DEST);
+        Destination[] destinations = jmsClient.createDestinations(2);
+        assertEquals(1, destinations.length);
+        // suffixes should be added
+        String expectedDestName = DEFAULT_DEST + ".0," + DEFAULT_DEST + ".1";
+        assertDestinationNameType(expectedDestName, QUEUE_TYPE, asAmqDest(destinations[0]));
+    }
+
+    @Test
+    public void testCreateDestinations_compositeCommaSeparated() throws JMSException {
+        clientProperties.setDestComposite(true);
+        clientProperties.setDestName("queue://foo,topic://cheese");
+        Destination[] destinations = jmsClient.createDestinations(2);
+        assertEquals(2, destinations.length);
+
+        assertDestinationNameType("foo.0,foo.1", QUEUE_TYPE, asAmqDest(destinations[0]));
+        assertDestinationNameType("cheese.0,cheese.1", TOPIC_TYPE, asAmqDest(destinations[1]));
+    }
+
+    private void assertDestinationNameType(String physicalName, byte destinationType, ActiveMQDestination destination) {
+        assertEquals(destinationType, destination.getDestinationType());
+        assertEquals(physicalName, destination.getPhysicalName());
+    }
+
+    private ActiveMQDestination asAmqDest(Destination destination) {
+        return (ActiveMQDestination) destination;
+    }
+}