You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@commons.apache.org by js...@apache.org on 2003/06/06 12:51:09 UTC
cvs commit: jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger MessengerSupport.java
jstrachan 2003/06/06 03:51:09
Modified: messenger/src/java/org/apache/commons/messenger
MessengerSupport.java
Log:
Added a log message when Destination objects are created.
Also fixed a bug where the noLocal flag wasn't correctly being passed through.
Revision Changes Path
1.35 +172 -131 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java
Index: MessengerSupport.java
===================================================================
RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java,v
retrieving revision 1.34
retrieving revision 1.35
diff -u -r1.34 -r1.35
--- MessengerSupport.java 4 Mar 2003 10:21:00 -0000 1.34
+++ MessengerSupport.java 6 Jun 2003 10:51:08 -0000 1.35
@@ -54,6 +54,8 @@
/** Logger */
private static final Log log = LogFactory.getLog(MessengerSupport.class);
+ private static final Log destinationLog = LogFactory.getLog("org.apache.commons.messenger.destination");
+
private static final boolean CACHE_REQUESTOR = true;
@@ -65,38 +67,39 @@
* or wether they should be created on the fly
*/
private boolean jndiDestinations;
-
+
/** are topic subscribers durable? */
private boolean durable;
- /** the delivery mode used by default when sending messages */
- private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
-
+ /** the delivery mode used by default when sending messages */
+ private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
+
/** the durable name used for durable topic based subscriptions */
private String durableName;
-
+
/**
* whether local messages are ignored when topic based subscription is used
* with a message selector
*/
private boolean noLocal;
-
+
/** Should we cache the requestor object per thread? */
private boolean cacheRequestors;
/** A Map of ListenerKey objects to MessageConsumer objects */
private Map listeners = new HashMap();
-
+
/** whether MessageProducer instances should be cached or not */
private boolean cacheProducers = true;
-
+
public MessengerSupport() {
}
-
+
public String toString() {
try {
Session session = borrowSession();
- String answer = super.toString() + " session: " + session.toString();
+ String answer =
+ super.toString() + " session: " + session.toString();
returnSession(session);
return answer;
}
@@ -104,14 +107,21 @@
return super.toString() + " session: " + e.toString();
}
}
-
+
public Destination getDestination(String subject) throws JMSException {
Session session = borrowSession();
try {
+ boolean debug = destinationLog.isInfoEnabled();
if (isTopic(session)) {
+ if (debug) {
+ destinationLog.info("Using topic: " + subject);
+ }
return getTopic((TopicSession) session, subject);
}
else {
+ if (debug) {
+ destinationLog.info("Using queue: " + subject);
+ }
return getQueue((QueueSession) session, subject);
}
}
@@ -119,7 +129,7 @@
returnSession(session);
}
}
-
+
public Destination createTemporaryDestination() throws JMSException {
Session session = borrowSession();
try {
@@ -136,7 +146,7 @@
returnSession(session);
}
}
-
+
public void send(Destination destination, Message message)
throws JMSException {
Session session = borrowSession();
@@ -156,14 +166,14 @@
}
}
- public Message call( Destination destination, Message message ) throws JMSException {
+ public Message call(Destination destination, Message message)
+ throws JMSException {
Session session = borrowSession();
MessageProducer producer = null;
try {
Destination replyTo = getReplyToDestination();
message.setJMSReplyTo(replyTo);
-
// NOTE - we could consider adding a correlation ID per request so that we can ignore
// any cruft or old messages that are sent onto our inbound queue.
//
@@ -174,17 +184,17 @@
//
// Maybe this should be a configurable strategy
- producer = borrowMessageProducer( session, destination );
+ producer = borrowMessageProducer(session, destination);
MessageConsumer consumer = getReplyToConsumer();
if (isTopic(producer)) {
- ((TopicPublisher) producer).publish( message );
+ ((TopicPublisher) producer).publish(message);
}
else {
- ((QueueSender) producer).send( message );
+ ((QueueSender) producer).send(message);
}
Message response = consumer.receive();
- if ( response == null ) {
+ if (response == null) {
// we could have timed out so lets trash the temporary destination
// so that the next call() method will use a new destination to avoid
// the response for this call() coming back on later call() invokcations
@@ -194,10 +204,10 @@
}
finally {
returnMessageProducer(producer);
- returnSession( session );
+ returnSession(session);
}
}
-
+
public Message call(
Destination destination,
Message message,
@@ -208,7 +218,7 @@
try {
Destination replyTo = getReplyToDestination();
message.setJMSReplyTo(replyTo);
-
+
// NOTE - we could consider adding a correlation ID per request so that we can ignore
// any cruft or old messages that are sent onto our inbound queue.
//
@@ -218,9 +228,9 @@
// and use correlation IDs to dispatch to the corrent thread
//
// Maybe this should be a configurable strategy
-
+
producer = borrowMessageProducer(session, destination);
-
+
MessageConsumer consumer = getReplyToConsumer();
if (isTopic(producer)) {
((TopicPublisher) producer).publish(message);
@@ -229,7 +239,7 @@
((QueueSender) producer).send(message);
}
Message response = consumer.receive(timeoutMillis);
- if ( response == null ) {
+ if (response == null) {
// we could have timed out so lets trash the temporary destination
// so that the next call() method will use a new destination to avoid
// the response for this call() coming back on later call() invokcations
@@ -242,7 +252,7 @@
returnSession(session);
}
}
-
+
public Message receive(Destination destination) throws JMSException {
Session session = borrowSession();
MessageConsumer consumer = null;
@@ -339,7 +349,9 @@
}
}
- public MessageConsumer createConsumer(Destination destination, String selector)
+ public MessageConsumer createConsumer(
+ Destination destination,
+ String selector)
throws JMSException {
Session session = borrowSession();
try {
@@ -367,7 +379,11 @@
ServerSessionPool sessionPool,
int maxMessages)
throws JMSException {
- return createConnectionConsumer(destination, null, sessionPool, maxMessages);
+ return createConnectionConsumer(
+ destination,
+ null,
+ sessionPool,
+ maxMessages);
}
public ConnectionConsumer createConnectionConsumer(
@@ -407,7 +423,6 @@
public abstract Connection getConnection() throws JMSException;
-
// Listener API
//-------------------------------------------------------------------------
public void addListener(Destination destination, MessageListener listener)
@@ -418,7 +433,8 @@
}
Session session = borrowListenerSession();
try {
- MessageConsumer consumer = createMessageConsumer(session, destination);
+ MessageConsumer consumer =
+ createMessageConsumer(session, destination);
consumer.setMessageListener(listener);
ListenerKey key = new ListenerKey(destination, listener);
listeners.put(key, consumer);
@@ -451,7 +467,9 @@
}
}
- public void removeListener(Destination destination, MessageListener listener)
+ public void removeListener(
+ Destination destination,
+ MessageListener listener)
throws JMSException {
ListenerKey key = new ListenerKey(destination, listener);
MessageConsumer consumer = (MessageConsumer) listeners.remove(key);
@@ -584,18 +602,19 @@
/**
* Creates a browser on the given Queue
- */
- public QueueBrowser createBrowser(Destination destination) throws JMSException {
+ */
+ public QueueBrowser createBrowser(Destination destination)
+ throws JMSException {
Session session = borrowSession();
QueueBrowser browser = null;
try {
return createBrowser(session, destination);
- }
+ }
finally {
returnSession(session);
}
}
-
+
/** Get the producer's default delivery mode. */
public int getDeliveryMode(Destination destination) throws JMSException {
Session session = borrowSession();
@@ -709,7 +728,9 @@
}
/** Set whether message timestamps are disabled. */
- public void setDisableMessageTimestamp(Destination destination, boolean value)
+ public void setDisableMessageTimestamp(
+ Destination destination,
+ boolean value)
throws JMSException {
Session session = borrowSession();
MessageProducer producer = null;
@@ -743,7 +764,11 @@
timeToLive);
}
else {
- ((QueueSender) producer).send(message, deliveryMode, priority, timeToLive);
+ ((QueueSender) producer).send(
+ message,
+ deliveryMode,
+ priority,
+ timeToLive);
}
}
finally {
@@ -784,7 +809,6 @@
}
}
-
// Properties
//-------------------------------------------------------------------------
/** Gets the name that this Messenger is called in a MessengerManager */
@@ -821,13 +845,12 @@
public boolean isCacheRequestors() {
return cacheRequestors;
}
-
+
/** Sets whether we should cache the requestor object per thread? */
public void setCacheRequestors(boolean cacheRequestors) {
this.cacheRequestors = cacheRequestors;
}
-
/** @return the durable name used for durable topic based subscriptions */
public String getDurableName() {
return durableName;
@@ -845,7 +868,7 @@
public boolean isNoLocal() {
return noLocal;
}
-
+
/**
* Sets whether local messages are ignored when topic based subscription is used
* with a message selector
@@ -853,7 +876,7 @@
public void setNoLocal(boolean noLocal) {
this.noLocal = noLocal;
}
-
+
/** Gets whether MessageProducer instances should be cached or not, which defaults to true */
public boolean isCacheProducers() {
return cacheProducers;
@@ -864,96 +887,99 @@
this.cacheProducers = cacheProducers;
}
- /**
- * Returns the delivery mode used on messages sent via this Messenger
- * @return int
- */
- public int getDeliveryMode() {
- return deliveryMode;
- }
-
- /**
- * Sets the delivery mode used on messages sent via this Messenger
- * @param deliveryMode The deliveryMode to set
- */
- public void setDeliveryMode(int deliveryMode) {
- this.deliveryMode = deliveryMode;
- }
+ /**
+ * Returns the delivery mode used on messages sent via this Messenger
+ * @return int
+ */
+ public int getDeliveryMode() {
+ return deliveryMode;
+ }
+
+ /**
+ * Sets the delivery mode used on messages sent via this Messenger
+ * @param deliveryMode The deliveryMode to set
+ */
+ public void setDeliveryMode(int deliveryMode) {
+ this.deliveryMode = deliveryMode;
+ }
-
/**
* Sets whether message delivery should be persistent or not
- *
* @param persistentDelivery
*/
+ *
+ * @param persistentDelivery
+ */
public void setPersistentDelivery(boolean persistentDelivery) {
- if (persistentDelivery) {
- setDeliveryMode(DeliveryMode.PERSISTENT);
- }
- else {
- setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- }
+ if (persistentDelivery) {
+ setDeliveryMode(DeliveryMode.PERSISTENT);
+ }
+ else {
+ setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ }
}
-
+
// Implementation methods
//-------------------------------------------------------------------------
-
+
/** Borrows a session instance from the pool */
protected abstract Session borrowSession() throws JMSException;
-
+
/** @return a session instance back to the pool */
protected abstract void returnSession(Session session) throws JMSException;
-
+
/** Deletes a session instance */
//protected abstract void deleteSession(Session session) throws JMSException;
-
+
/** Borrows a session instance from the pool */
protected abstract Session borrowListenerSession() throws JMSException;
-
+
/** @return a session instance back to the pool */
protected abstract void returnListenerSession(Session session)
throws JMSException;
-
- protected abstract boolean isTopic(Connection connection) throws JMSException;
-
+
+ protected abstract boolean isTopic(Connection connection)
+ throws JMSException;
+
protected abstract boolean isTopic(ConnectionFactory factory)
throws JMSException;
-
+
protected abstract boolean isTopic(Session session) throws JMSException;
-
+
protected abstract boolean isTopic(MessageProducer producer)
throws JMSException;
-
+
/**
* @return the current thread's MessengerSession
- */
- protected abstract MessengerSession getMessengerSession() throws JMSException;
-
-
+ */
+ protected abstract MessengerSession getMessengerSession()
+ throws JMSException;
+
/** @return a message producer for the given session and destination */
protected MessageProducer borrowMessageProducer(
Session session,
Destination destination)
throws JMSException {
-
- if ( isCacheProducers() ) {
- return getMessengerSession().getMessageProducer(destination);
+
+ if (isCacheProducers()) {
+ return getMessengerSession().getMessageProducer(destination);
}
- else {
+ else {
return createMessageProducer(session, destination);
}
}
-
- protected void returnMessageProducer(MessageProducer producer) throws JMSException {
- if ( ! isCacheProducers() ) {
+
+ protected void returnMessageProducer(MessageProducer producer)
+ throws JMSException {
+ if (!isCacheProducers()) {
producer.close();
}
}
-
+
/** @return a newly created message producer for the given session and destination */
protected MessageProducer createMessageProducer(
Session session,
Destination destination)
throws JMSException {
-
+
MessageProducer answer = null;
if (isTopic(session)) {
TopicSession topicSession = (TopicSession) session;
@@ -963,14 +989,14 @@
QueueSession queueSession = (QueueSession) session;
answer = queueSession.createSender((Queue) destination);
}
-
+
// configure the MessageProducer
if (deliveryMode != Message.DEFAULT_DELIVERY_MODE) {
- answer.setDeliveryMode(deliveryMode);
+ answer.setDeliveryMode(deliveryMode);
}
return answer;
}
-
+
/**
* @return the MessageConsumer for this threads temporary destination
* which is cached for the duration of this process.
@@ -978,16 +1004,16 @@
protected MessageConsumer getReplyToConsumer() throws JMSException {
MessengerSession messengerSession = getMessengerSession();
MessageConsumer consumer = messengerSession.getReplyToConsumer();
- if ( consumer == null ) {
- consumer = createMessageConsumer(
- messengerSession.getSession(),
- messengerSession.getReplyToDestination()
- );
+ if (consumer == null) {
+ consumer =
+ createMessageConsumer(
+ messengerSession.getSession(),
+ messengerSession.getReplyToDestination());
messengerSession.setReplyToConsumer(consumer);
}
return consumer;
}
-
+
/**
* Clears the temporary destination used to receive reply-to messages
* which will lazily force a new destination and consumer to be created next
@@ -995,65 +1021,76 @@
*/
protected void clearReplyToDestination() throws JMSException {
MessengerSession messengerSession = getMessengerSession();
-
+
messengerSession.setReplyToDestination(null);
MessageConsumer consumer = messengerSession.getReplyToConsumer();
- if ( consumer != null ) {
+ if (consumer != null) {
messengerSession.setReplyToConsumer(null);
-
+
// ensure that everything is nullified first before we close
// just in case an exception occurs
consumer.close();
}
}
-
/** @return a MessageConsumer for the given session and destination */
protected MessageConsumer borrowMessageConsumer(
Session session,
Destination destination)
throws JMSException {
-
+
MessageConsumer consumer = createMessageConsumer(session, destination);
if (log.isDebugEnabled()) {
- log.debug( "Created new consumer: " + consumer + " on destination: " + destination );
+ log.debug(
+ "Created new consumer: "
+ + consumer
+ + " on destination: "
+ + destination);
}
-
- return consumer;
+
+ return consumer;
}
-
+
/** @return a MessageConsumer for the given session, destination and selector */
protected MessageConsumer borrowMessageConsumer(
Session session,
Destination destination,
String selector)
throws JMSException {
-
- MessageConsumer consumer = createMessageConsumer(session, destination, selector);
-
+
+ MessageConsumer consumer =
+ createMessageConsumer(session, destination, selector);
+
if (log.isDebugEnabled()) {
- log.debug( "Created new consumer: " + consumer + " on destination: " + destination + " selector: " + selector );
+ log.debug(
+ "Created new consumer: "
+ + consumer
+ + " on destination: "
+ + destination
+ + " selector: "
+ + selector);
}
-
- return consumer;
+
+ return consumer;
}
-
+
/**
* Returns a message consumer back to the pool.
* By default this method will close message consumers though we should
* be able to cache then
*/
- protected void returnMessageConsumer(MessageConsumer messageConsumer) throws JMSException {
+ protected void returnMessageConsumer(MessageConsumer messageConsumer)
+ throws JMSException {
if (log.isDebugEnabled()) {
- log.debug( "Closing consumer: " + messageConsumer );
+ log.debug("Closing consumer: " + messageConsumer);
}
-
- if ( messageConsumer != null ) {
+
+ if (messageConsumer != null) {
messageConsumer.close();
}
}
-
+
/** @return a new MessageConsumer for the given session and destination */
protected MessageConsumer createMessageConsumer(
Session session,
@@ -1064,10 +1101,15 @@
if (isDurable()) {
return topicSession.createDurableSubscriber(
(Topic) destination,
- getDurableName());
+ getDurableName(),
+ null,
+ isNoLocal());
}
else {
- return topicSession.createSubscriber((Topic) destination);
+ return topicSession.createSubscriber(
+ (Topic) destination,
+ null,
+ isNoLocal());
}
}
else {
@@ -1075,8 +1117,7 @@
return queueSession.createReceiver((Queue) destination);
}
}
-
-
+
/** @return a new MessageConsumer for the given session, destination and selector */
protected MessageConsumer createMessageConsumer(
Session session,
@@ -1104,7 +1145,7 @@
return queueSession.createReceiver((Queue) destination, selector);
}
}
-
+
/** @return a new QueueBrowser for the given session and destination */
protected QueueBrowser createBrowser(
Session session,
@@ -1112,25 +1153,25 @@
throws JMSException {
if (isTopic(session)) {
return null;
- }
+ }
else {
QueueSession queueSession = (QueueSession) session;
return queueSession.createBrowser((Queue) destination);
}
}
-
+
protected Queue getQueue(QueueSession session, String subject)
throws JMSException {
// XXXX: might want to cache
return session.createQueue(subject);
}
-
+
protected Topic getTopic(TopicSession session, String subject)
throws JMSException {
// XXXX: might want to cache
return session.createTopic(subject);
}
-
+
protected Destination getReplyToDestination() throws JMSException {
return getMessengerSession().getReplyToDestination();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commons-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: commons-dev-help@jakarta.apache.org