You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/08/22 17:09:45 UTC
svn commit: r568644 - in /activemq/trunk/activemq-core: ./
src/main/java/org/apache/activemq/ src/test/java/org/apache/activemq/
Author: jstrachan
Date: Wed Aug 22 08:09:44 2007
New Revision: 568644
URL: http://svn.apache.org/viewvc?rev=568644&view=rev
Log:
allow a MessageListener to be specified when creating a consumer; to avoid threading issues when creating lots of consumers after the connection has started
Modified:
activemq/trunk/activemq-core/pom.xml
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueReceiver.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQTopicSubscriber.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=568644&r1=568643&r2=568644&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Wed Aug 22 08:09:44 2007
@@ -390,6 +390,18 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-clean-plugin</artifactId>
+ <configuration>
+ <filesets>
+ <fileset>
+ <directory>${basedir}/activemq-data</directory>
+ </fileset>
+ </filesets>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=568644&r1=568643&r2=568644&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Wed Aug 22 08:09:44 2007
@@ -136,12 +136,13 @@
* @param noLocal
* @param browser
* @param dispatchAsync
+ * @param messageListener
* @throws JMSException
*/
public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
- String name, String selector, int prefetch,
- int maximumPendingMessageCount, boolean noLocal, boolean browser,
- boolean dispatchAsync) throws JMSException {
+ String name, String selector, int prefetch,
+ int maximumPendingMessageCount, boolean noLocal, boolean browser,
+ boolean dispatchAsync, MessageListener messageListener) throws JMSException {
if (dest == null) {
throw new InvalidDestinationException("Don't understand null destinations");
} else if (dest.getPhysicalName() == null) {
@@ -206,6 +207,10 @@
this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge()
&& !info.isBrowser();
this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
+
+ if (messageListener != null) {
+ setMessageListener(messageListener);
+ }
try {
this.session.addConsumer(this);
this.session.syncSendPacket(info);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java?rev=568644&r1=568643&r2=568644&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueBrowser.java Wed Aug 22 08:09:44 2007
@@ -96,7 +96,7 @@
browseDone.set(false);
ActiveMQPrefetchPolicy prefetchPolicy = session.connection.getPrefetchPolicy();
return new ActiveMQMessageConsumer(session, consumerId, destination, null, selector, prefetchPolicy.getQueueBrowserPrefetch(), prefetchPolicy
- .getMaximumPendingMessageLimit(), false, true, dispatchAsync) {
+ .getMaximumPendingMessageLimit(), false, true, dispatchAsync, null) {
public void dispatch(MessageDispatch md) {
if (md.getMessage() == null) {
browseDone.set(true);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueReceiver.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueReceiver.java?rev=568644&r1=568643&r2=568644&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueReceiver.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueReceiver.java Wed Aug 22 08:09:44 2007
@@ -68,7 +68,7 @@
int maximumPendingMessageCount, boolean asyncDispatch)
throws JMSException {
super(theSession, consumerId, destination, null, selector, prefetch, maximumPendingMessageCount,
- false, false, asyncDispatch);
+ false, false, asyncDispatch, null);
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=568644&r1=568643&r2=568644&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Wed Aug 22 08:09:44 2007
@@ -16,79 +16,33 @@
*/
package org.apache.activemq;
-import java.io.File;
-import java.io.InputStream;
-import java.io.Serializable;
-import java.net.URL;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.QueueReceiver;
-import javax.jms.QueueSender;
-import javax.jms.QueueSession;
-import javax.jms.Session;
-import javax.jms.StreamMessage;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-
import org.apache.activemq.blob.BlobTransferPolicy;
import org.apache.activemq.blob.BlobUploader;
-import org.apache.activemq.command.ActiveMQBlobMessage;
-import org.apache.activemq.command.ActiveMQBytesMessage;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMapMessage;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQObjectMessage;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQStreamMessage;
-import org.apache.activemq.command.ActiveMQTempDestination;
-import org.apache.activemq.command.ActiveMQTempQueue;
-import org.apache.activemq.command.ActiveMQTempTopic;
-import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.command.Response;
-import org.apache.activemq.command.SessionId;
-import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.*;
import org.apache.activemq.management.JMSSessionStatsImpl;
import org.apache.activemq.management.StatsCapable;
import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.MemoryUsage;
-import org.apache.activemq.usage.Usage;
-import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+import javax.jms.Message;
+import java.io.File;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.net.URL;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
/**
* <P>
* A <CODE>Session</CODE> object is a single-threaded context for producing
@@ -866,8 +820,7 @@
* @since 1.1
*/
public MessageConsumer createConsumer(Destination destination) throws JMSException {
- checkClosed();
- return createConsumer(destination, null);
+ return createConsumer(destination, (String) null);
}
/**
@@ -894,38 +847,54 @@
* @since 1.1
*/
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
- checkClosed();
-
- if (destination instanceof CustomDestination) {
- CustomDestination customDestination = (CustomDestination)destination;
- return customDestination.createConsumer(this, messageSelector);
- }
-
- int prefetch = 0;
-
- ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
- if (destination instanceof Topic) {
- prefetch = prefetchPolicy.getTopicPrefetch();
- } else {
- prefetch = prefetchPolicy.getQueuePrefetch();
- }
-
- return new ActiveMQMessageConsumer(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(destination), null, messageSelector, prefetch,
- prefetchPolicy.getMaximumPendingMessageLimit(), false, false, asyncDispatch);
+ return createConsumer(destination, messageSelector, false);
}
/**
- * @return
+ * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
+ * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
+ * <CODE>Destination</CODE>, they can be used in the destination
+ * parameter to create a <CODE>MessageConsumer</CODE>.
+ *
+ * @param destination the <CODE>Destination</CODE> to access.
+ * @param messageListener the listener to use for async consumption of messages
+ * @return the MessageConsumer
+ * @throws JMSException if the session fails to create a consumer due to
+ * some internal error.
+ * @throws InvalidDestinationException if an invalid destination is
+ * specified.
+ * @since 1.1
*/
- protected ConsumerId getNextConsumerId() {
- return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId());
+ public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException {
+ return createConsumer(destination, null, messageListener);
}
/**
- * @return
+ * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
+ * using a message selector. Since <CODE> Queue</CODE> and
+ * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
+ * can be used in the destination parameter to create a
+ * <CODE>MessageConsumer</CODE>.
+ * <P>
+ * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
+ * that have been sent to a destination.
+ *
+ * @param destination the <CODE>Destination</CODE> to access
+ * @param messageSelector only messages with properties matching the message
+ * selector expression are delivered. A value of null or an
+ * empty string indicates that there is no message selector
+ * for the message consumer.
+ * @param messageListener the listener to use for async consumption of messages
+ * @return the MessageConsumer
+ * @throws JMSException if the session fails to create a MessageConsumer due
+ * to some internal error.
+ * @throws InvalidDestinationException if an invalid destination is
+ * specified.
+ * @throws InvalidSelectorException if the message selector is invalid.
+ * @since 1.1
*/
- protected ProducerId getNextProducerId() {
- return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId());
+ public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException {
+ return createConsumer(destination, messageSelector, false, messageListener);
}
/**
@@ -965,6 +934,47 @@
* @since 1.1
*/
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
+ return createConsumer(destination, messageSelector, noLocal, null);
+ }
+
+ /**
+ * Creates <CODE>MessageConsumer</CODE> for the specified destination,
+ * using a message selector. This method can specify whether messages
+ * published by its own connection should be delivered to it, if the
+ * destination is a topic.
+ * <P>
+ * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
+ * <CODE>Destination</CODE>, they can be used in the destination
+ * parameter to create a <CODE>MessageConsumer</CODE>.
+ * <P>
+ * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
+ * that have been published to a destination.
+ * <P>
+ * In some cases, a connection may both publish and subscribe to a topic.
+ * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
+ * inhibit the delivery of messages published by its own connection. The
+ * default value for this attribute is False. The <CODE>noLocal</CODE>
+ * value must be supported by destinations that are topics.
+ *
+ * @param destination the <CODE>Destination</CODE> to access
+ * @param messageSelector only messages with properties matching the message
+ * selector expression are delivered. A value of null or an
+ * empty string indicates that there is no message selector
+ * for the message consumer.
+ * @param noLocal - if true, and the destination is a topic, inhibits the
+ * delivery of messages published by its own connection. The
+ * behavior for <CODE>NoLocal</CODE> is not specified if
+ * the destination is a queue.
+ * @param messageListener the listener to use for async consumption of messages
+ * @return the MessageConsumer
+ * @throws JMSException if the session fails to create a MessageConsumer due
+ * to some internal error.
+ * @throws InvalidDestinationException if an invalid destination is
+ * specified.
+ * @throws InvalidSelectorException if the message selector is invalid.
+ * @since 1.1
+ */
+ public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
checkClosed();
if (destination instanceof CustomDestination) {
@@ -973,8 +983,15 @@
}
ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
- return new ActiveMQMessageConsumer(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(destination), null, messageSelector, prefetchPolicy
- .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch);
+ int prefetch = 0;
+ if (destination instanceof Topic) {
+ prefetch = prefetchPolicy.getTopicPrefetch();
+ } else {
+ prefetch = prefetchPolicy.getQueuePrefetch();
+ }
+ ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
+ return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector,
+ prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch, messageListener);
}
/**
@@ -1519,6 +1536,20 @@
*/
protected SessionId getSessionId() {
return info.getSessionId();
+ }
+
+ /**
+ * @return
+ */
+ protected ConsumerId getNextConsumerId() {
+ return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId());
+ }
+
+ /**
+ * @return
+ */
+ protected ProducerId getNextProducerId() {
+ return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId());
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQTopicSubscriber.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQTopicSubscriber.java?rev=568644&r1=568643&r2=568644&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQTopicSubscriber.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQTopicSubscriber.java Wed Aug 22 08:09:44 2007
@@ -114,7 +114,7 @@
protected ActiveMQTopicSubscriber(ActiveMQSession theSession,
ConsumerId consumerId, ActiveMQDestination dest, String name, String selector, int prefetch, int maximumPendingMessageCount,
boolean noLocalValue, boolean browserValue, boolean asyncDispatch) throws JMSException {
- super(theSession, consumerId, dest, name, selector, prefetch, maximumPendingMessageCount, noLocalValue, browserValue, asyncDispatch);
+ super(theSession, consumerId, dest, name, selector, prefetch, maximumPendingMessageCount, noLocalValue, browserValue, asyncDispatch, null);
}
/**
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java?rev=568644&r1=568643&r2=568644&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java Wed Aug 22 08:09:44 2007
@@ -251,6 +251,38 @@
assertEquals(4, counter.get());
}
+ public void initCombosForTestPassMessageListenerIntoCreateConsumer() {
+ addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
+ }
+
+ public void testPassMessageListenerIntoCreateConsumer() throws Exception {
+
+ final AtomicInteger counter = new AtomicInteger(0);
+ final CountDownLatch done = new CountDownLatch(1);
+
+ // Receive a message with the JMS API
+ connection.start();
+ ActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination, new MessageListener() {
+ public void onMessage(Message m) {
+ counter.incrementAndGet();
+ if (counter.get() == 4) {
+ done.countDown();
+ }
+ }
+ });
+
+ // Send the messages
+ sendMessages(session, destination, 4);
+
+ assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
+ Thread.sleep(200);
+
+ // Make sure only 4 messages were delivered.
+ assertEquals(4, counter.get());
+ }
+
public void initCombosForTestMessageListenerUnackedWithPrefetch1StayInQueue() {
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE),