You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/08/22 17:09:45 UTC

svn commit: r568644 - in /activemq/trunk/activemq-core: ./ src/main/java/org/apache/activemq/ src/test/java/org/apache/activemq/

Author: jstrachan
Date: Wed Aug 22 08:09:44 2007
New Revision: 568644

URL: http://svn.apache.org/viewvc?rev=568644&view=rev
Log:
allow a MessageListener to be specified when creating a consumer; to avoid threading issues when creating lots of consumers after the connection has started

Modified:
    activemq/trunk/activemq-core/pom.xml
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueReceiver.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQTopicSubscriber.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java

Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=568644&r1=568643&r2=568644&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Wed Aug 22 08:09:44 2007
@@ -390,6 +390,18 @@
 
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-clean-plugin</artifactId>
+        <configuration>
+          <filesets>
+            <fileset>
+              <directory>${basedir}/activemq-data</directory>
+            </fileset>
+          </filesets>
+        </configuration>
+      </plugin>
+      
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-antrun-plugin</artifactId>
 
         <executions>

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=568644&r1=568643&r2=568644&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Wed Aug 22 08:09:44 2007
@@ -136,12 +136,13 @@
      * @param noLocal
      * @param browser
      * @param dispatchAsync
+     * @param messageListener
      * @throws JMSException
      */
     public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
-                                   String name, String selector, int prefetch,
-                                   int maximumPendingMessageCount, boolean noLocal, boolean browser,
-                                   boolean dispatchAsync) throws JMSException {
+            String name, String selector, int prefetch,
+            int maximumPendingMessageCount, boolean noLocal, boolean browser,
+            boolean dispatchAsync, MessageListener messageListener) throws JMSException {
         if (dest == null) {
             throw new InvalidDestinationException("Don't understand null destinations");
         } else if (dest.getPhysicalName() == null) {
@@ -206,6 +207,10 @@
         this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge()
                                    && !info.isBrowser();
         this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
+
+        if (messageListener != null) {
+            setMessageListener(messageListener);
+        }
         try {
             this.session.addConsumer(this);
             this.session.syncSendPacket(info);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java?rev=568644&r1=568643&r2=568644&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java Wed Aug 22 08:09:44 2007
@@ -96,7 +96,7 @@
         browseDone.set(false);
         ActiveMQPrefetchPolicy prefetchPolicy = session.connection.getPrefetchPolicy();
         return new ActiveMQMessageConsumer(session, consumerId, destination, null, selector, prefetchPolicy.getQueueBrowserPrefetch(), prefetchPolicy
-            .getMaximumPendingMessageLimit(), false, true, dispatchAsync) {
+            .getMaximumPendingMessageLimit(), false, true, dispatchAsync, null) {
             public void dispatch(MessageDispatch md) {
                 if (md.getMessage() == null) {
                     browseDone.set(true);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueReceiver.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueReceiver.java?rev=568644&r1=568643&r2=568644&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueReceiver.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueReceiver.java Wed Aug 22 08:09:44 2007
@@ -68,7 +68,7 @@
                                     int maximumPendingMessageCount, boolean asyncDispatch)
         throws JMSException {
         super(theSession, consumerId, destination, null, selector, prefetch, maximumPendingMessageCount,
-              false, false, asyncDispatch);
+              false, false, asyncDispatch, null);
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=568644&r1=568643&r2=568644&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Wed Aug 22 08:09:44 2007
@@ -16,79 +16,33 @@
  */
 package org.apache.activemq;
 
-import java.io.File;
-import java.io.InputStream;
-import java.io.Serializable;
-import java.net.URL;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.QueueReceiver;
-import javax.jms.QueueSender;
-import javax.jms.QueueSession;
-import javax.jms.Session;
-import javax.jms.StreamMessage;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-
 import org.apache.activemq.blob.BlobTransferPolicy;
 import org.apache.activemq.blob.BlobUploader;
-import org.apache.activemq.command.ActiveMQBlobMessage;
-import org.apache.activemq.command.ActiveMQBytesMessage;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMapMessage;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQObjectMessage;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQStreamMessage;
-import org.apache.activemq.command.ActiveMQTempDestination;
-import org.apache.activemq.command.ActiveMQTempQueue;
-import org.apache.activemq.command.ActiveMQTempTopic;
-import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.command.Response;
-import org.apache.activemq.command.SessionId;
-import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.*;
 import org.apache.activemq.management.JMSSessionStatsImpl;
 import org.apache.activemq.management.StatsCapable;
 import org.apache.activemq.management.StatsImpl;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.transaction.Synchronization;
 import org.apache.activemq.usage.MemoryUsage;
-import org.apache.activemq.usage.Usage;
-import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.Callback;
 import org.apache.activemq.util.LongSequenceGenerator;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+import javax.jms.Message;
+import java.io.File;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.net.URL;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 /**
  * <P>
  * A <CODE>Session</CODE> object is a single-threaded context for producing
@@ -866,8 +820,7 @@
      * @since 1.1
      */
     public MessageConsumer createConsumer(Destination destination) throws JMSException {
-        checkClosed();
-        return createConsumer(destination, null);
+        return createConsumer(destination, (String) null);
     }
 
     /**
@@ -894,38 +847,54 @@
      * @since 1.1
      */
     public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
-        checkClosed();
-
-        if (destination instanceof CustomDestination) {
-            CustomDestination customDestination = (CustomDestination)destination;
-            return customDestination.createConsumer(this, messageSelector);
-        }
-
-        int prefetch = 0;
-
-        ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
-        if (destination instanceof Topic) {
-            prefetch = prefetchPolicy.getTopicPrefetch();
-        } else {
-            prefetch = prefetchPolicy.getQueuePrefetch();
-        }
-
-        return new ActiveMQMessageConsumer(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(destination), null, messageSelector, prefetch,
-                                           prefetchPolicy.getMaximumPendingMessageLimit(), false, false, asyncDispatch);
+        return createConsumer(destination, messageSelector, false);
     }
 
     /**
-     * @return
+     * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
+     * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
+     * <CODE>Destination</CODE>, they can be used in the destination
+     * parameter to create a <CODE>MessageConsumer</CODE>.
+     *
+     * @param destination the <CODE>Destination</CODE> to access.
+     * @param messageListener the listener to use for async consumption of messages
+     * @return the MessageConsumer
+     * @throws JMSException if the session fails to create a consumer due to
+     *                 some internal error.
+     * @throws InvalidDestinationException if an invalid destination is
+     *                 specified.
+     * @since 1.1
      */
-    protected ConsumerId getNextConsumerId() {
-        return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId());
+    public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException {
+        return createConsumer(destination, null, messageListener);
     }
 
     /**
-     * @return
+     * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
+     * using a message selector. Since <CODE> Queue</CODE> and
+     * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
+     * can be used in the destination parameter to create a
+     * <CODE>MessageConsumer</CODE>.
+     * <P>
+     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
+     * that have been sent to a destination.
+     *
+     * @param destination the <CODE>Destination</CODE> to access
+     * @param messageSelector only messages with properties matching the message
+     *                selector expression are delivered. A value of null or an
+     *                empty string indicates that there is no message selector
+     *                for the message consumer.
+     * @param messageListener the listener to use for async consumption of messages
+     * @return the MessageConsumer
+     * @throws JMSException if the session fails to create a MessageConsumer due
+     *                 to some internal error.
+     * @throws InvalidDestinationException if an invalid destination is
+     *                 specified.
+     * @throws InvalidSelectorException if the message selector is invalid.
+     * @since 1.1
      */
-    protected ProducerId getNextProducerId() {
-        return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId());
+    public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException {
+        return createConsumer(destination, messageSelector, false, messageListener);
     }
 
     /**
@@ -965,6 +934,47 @@
      * @since 1.1
      */
     public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
+        return createConsumer(destination, messageSelector, noLocal, null);
+    }
+
+    /**
+     * Creates <CODE>MessageConsumer</CODE> for the specified destination,
+     * using a message selector. This method can specify whether messages
+     * published by its own connection should be delivered to it, if the
+     * destination is a topic.
+     * <P>
+     * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
+     * <CODE>Destination</CODE>, they can be used in the destination
+     * parameter to create a <CODE>MessageConsumer</CODE>.
+     * <P>
+     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
+     * that have been published to a destination.
+     * <P>
+     * In some cases, a connection may both publish and subscribe to a topic.
+     * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
+     * inhibit the delivery of messages published by its own connection. The
+     * default value for this attribute is False. The <CODE>noLocal</CODE>
+     * value must be supported by destinations that are topics.
+     *
+     * @param destination the <CODE>Destination</CODE> to access
+     * @param messageSelector only messages with properties matching the message
+     *                selector expression are delivered. A value of null or an
+     *                empty string indicates that there is no message selector
+     *                for the message consumer.
+     * @param noLocal - if true, and the destination is a topic, inhibits the
+     *                delivery of messages published by its own connection. The
+     *                behavior for <CODE>NoLocal</CODE> is not specified if
+     *                the destination is a queue.
+     * @param messageListener the listener to use for async consumption of messages
+     * @return the MessageConsumer
+     * @throws JMSException if the session fails to create a MessageConsumer due
+     *                 to some internal error.
+     * @throws InvalidDestinationException if an invalid destination is
+     *                 specified.
+     * @throws InvalidSelectorException if the message selector is invalid.
+     * @since 1.1
+     */
+    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
         checkClosed();
 
         if (destination instanceof CustomDestination) {
@@ -973,8 +983,15 @@
         }
 
         ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
-        return new ActiveMQMessageConsumer(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(destination), null, messageSelector, prefetchPolicy
-            .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch);
+        int prefetch = 0;
+        if (destination instanceof Topic) {
+            prefetch = prefetchPolicy.getTopicPrefetch();
+        } else {
+            prefetch = prefetchPolicy.getQueuePrefetch();
+        }
+        ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
+        return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector,
+                prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch, messageListener);
     }
 
     /**
@@ -1519,6 +1536,20 @@
      */
     protected SessionId getSessionId() {
         return info.getSessionId();
+    }
+
+    /**
+     * @return
+     */
+    protected ConsumerId getNextConsumerId() {
+        return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId());
+    }
+
+    /**
+     * @return
+     */
+    protected ProducerId getNextProducerId() {
+        return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId());
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQTopicSubscriber.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQTopicSubscriber.java?rev=568644&r1=568643&r2=568644&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQTopicSubscriber.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQTopicSubscriber.java Wed Aug 22 08:09:44 2007
@@ -114,7 +114,7 @@
     protected ActiveMQTopicSubscriber(ActiveMQSession theSession,
                                       ConsumerId consumerId, ActiveMQDestination dest, String name, String selector, int prefetch, int maximumPendingMessageCount,
                                       boolean noLocalValue, boolean browserValue, boolean asyncDispatch) throws JMSException {
-        super(theSession, consumerId, dest, name, selector, prefetch, maximumPendingMessageCount, noLocalValue, browserValue, asyncDispatch);
+        super(theSession, consumerId, dest, name, selector, prefetch, maximumPendingMessageCount, noLocalValue, browserValue, asyncDispatch, null);
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java?rev=568644&r1=568643&r2=568644&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java Wed Aug 22 08:09:44 2007
@@ -251,6 +251,38 @@
         assertEquals(4, counter.get());
     }
 
+    public void initCombosForTestPassMessageListenerIntoCreateConsumer() {
+        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
+    }
+
+    public void testPassMessageListenerIntoCreateConsumer() throws Exception {
+
+        final AtomicInteger counter = new AtomicInteger(0);
+        final CountDownLatch done = new CountDownLatch(1);
+
+        // Receive a message with the JMS API
+        connection.start();
+        ActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination, new MessageListener() {
+            public void onMessage(Message m) {
+                counter.incrementAndGet();
+                if (counter.get() == 4) {
+                    done.countDown();
+                }
+            }
+        });
+
+        // Send the messages
+        sendMessages(session, destination, 4);
+
+        assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
+        Thread.sleep(200);
+
+        // Make sure only 4 messages were delivered.
+        assertEquals(4, counter.get());
+    }
+
     public void initCombosForTestMessageListenerUnackedWithPrefetch1StayInQueue() {
         addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
         addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE),