You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@commons.apache.org by js...@apache.org on 2002/02/01 15:32:33 UTC
cvs commit: jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger InitMessengerServlet.java MessengerManager.java MessengerSupport.java
jstrachan 02/02/01 06:32:33
Modified: messenger/src/java/org/apache/commons/messenger
MessengerManager.java MessengerSupport.java
Added: messenger/src/java/org/apache/commons/messenger
InitMessengerServlet.java
Log:
Applied William's patches to close producers after using them.
Revision Changes Path
1.5 +3 -11 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerManager.java
Index: MessengerManager.java
===================================================================
RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerManager.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- MessengerManager.java 12 Oct 2001 16:37:09 -0000 1.4
+++ MessengerManager.java 1 Feb 2002 14:32:33 -0000 1.5
@@ -5,7 +5,7 @@
* version 1.1, a copy of which has been included with this distribution in
* the LICENSE file.
*
- * $Id: MessengerManager.java,v 1.4 2001/10/12 16:37:09 jstrachan Exp $
+ * $Id: MessengerManager.java,v 1.5 2002/02/01 14:32:33 jstrachan Exp $
*/
package org.apache.commons.messenger;
@@ -19,7 +19,7 @@
/** <p><code>MessengerManager</code> is a manager of {@link Messenger} instances.</p>
*
* @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
public class MessengerManager {
@@ -61,15 +61,7 @@
* from a given XML deployment configuration document
*/
public static void configure( String xmlURL ) throws JMSException {
- try {
- MessengerDigester digester = new MessengerDigester();
- setInstance( (MessengerManager) digester.parse( xmlURL ) );
- }
- catch (Exception e) {
- JMSException newException = new JMSException( "Could not load the Messenger XML config file from: " + xmlURL );
- newException.setLinkedException(e);
- throw newException;
- }
+ setInstance( load( xmlURL ) );
}
/** Returns the messenger for the given name */
1.16 +98 -93 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java
Index: MessengerSupport.java
===================================================================
RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java,v
retrieving revision 1.15
retrieving revision 1.16
diff -u -r1.15 -r1.16
--- MessengerSupport.java 25 Oct 2001 21:07:12 -0000 1.15
+++ MessengerSupport.java 1 Feb 2002 14:32:33 -0000 1.16
@@ -4,8 +4,8 @@
* This software is published under the terms of the Apache Software License
* version 1.1, a copy of which has been included with this distribution in
* the LICENSE file.
- *
- * $Id: MessengerSupport.java,v 1.15 2001/10/25 21:07:12 jstrachan Exp $
+ *
+ * $Id: MessengerSupport.java,v 1.16 2002/02/01 14:32:33 jstrachan Exp $
*/
package org.apache.commons.messenger;
@@ -45,26 +45,26 @@
* connection and session creation and the pooling strategy.</p>
*
* @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
- * @version $Revision: 1.15 $
+ * @version $Revision: 1.16 $
*/
public abstract class MessengerSupport implements Messenger {
private static final boolean CACHE_REQUESTOR = true;
-
-
+
+
/** The name of the Messenger */
private String name;
-
+
/** are topic subscribers durable? */
private boolean durable;
-
+
/** the durable name used for durable topic based subscriptions */
private String durableName;
-
+
/** whether local messages are ignored when topic based subscription is used
* with a message selector */
private boolean noLocal;
-
+
/** A Map of ListenerKey objects to MessageConsumer objects */
private Map listeners = new HashMap();
@@ -84,11 +84,11 @@
/** The inbox which is used for the call() methods */
private Destination replyToDestination;
-
-
+
+
public MessengerSupport() {
}
-
+
public String toString() {
try {
Session session = borrowSession();
@@ -100,7 +100,7 @@
return super.toString() + " session: " + e.toString();
}
}
-
+
public Destination getDestination(String subject) throws JMSException {
Session session = borrowSession();
try {
@@ -132,26 +132,28 @@
returnSession( session );
}
}
-
+
public void send( Destination destination, Message message ) throws JMSException {
Session session = borrowSession();
+ MessageProducer producer = null;
try {
- MessageProducer producer = getMessageProducer( session, destination );
+ producer = getMessageProducer( session, destination );
if ( producer instanceof TopicPublisher ) {
((TopicPublisher) producer).publish( message );
- }
+ }
else {
((QueueSender) producer).send( message );
}
}
finally {
+ producer.close();
returnSession( session );
}
}
public Message call( Destination destination, Message message ) throws JMSException {
Session session = borrowSession();
- try {
+ try {
if ( session instanceof TopicSession ) {
TopicRequestor requestor = getTopicRequestor( (TopicSession) session, (Topic) destination );
return requestor.request( message );
@@ -166,16 +168,16 @@
}
}
-/*
+/*
public Message call( Destination destination, Message message ) throws JMSException {
Session session = borrowSession();
try {
Destination replyTo = getReplyToDestination();
message.setJMSReplyTo(replyTo);
-
+
MessageProducer producer = getMessageProducer( session, destination );
MessageConsumer consumer = getMessageConsumer( session, replyTo );
-
+
if ( session instanceof TopicSession ) {
((TopicPublisher) producer).publish( message );
}
@@ -189,16 +191,18 @@
}
}
*/
-
+
public Message call( Destination destination, Message message, long timeoutMillis ) throws JMSException {
Session session = borrowSession();
+ MessageProducer producer = null;
+ MessageConsumer consumer = null;
try {
Destination replyTo = getReplyToDestination();
message.setJMSReplyTo(replyTo);
-
- MessageProducer producer = getMessageProducer( session, destination );
- MessageConsumer consumer = getMessageConsumer( session, replyTo );
-
+
+ producer = getMessageProducer( session, destination );
+ consumer = getMessageConsumer( session, replyTo );
+
if ( session instanceof TopicSession ) {
((TopicPublisher) producer).publish( message );
}
@@ -208,10 +212,11 @@
return consumer.receive(timeoutMillis);
}
finally {
+ producer.close();
returnSession( session );
}
}
-
+
public Message receive(Destination destination) throws JMSException {
Session session = borrowSession();
try {
@@ -222,7 +227,7 @@
returnSession( session );
}
}
-
+
public Message receive(Destination destination, String selector) throws JMSException {
Session session = borrowSession();
try {
@@ -233,7 +238,7 @@
returnSession( session );
}
}
-
+
public Message receive(Destination destination, long timeoutMillis) throws JMSException {
Session session = borrowSession();
try {
@@ -244,7 +249,7 @@
returnSession( session );
}
}
-
+
public Message receive(Destination destination, String selector, long timeoutMillis) throws JMSException {
Session session = borrowSession();
try {
@@ -255,7 +260,7 @@
returnSession( session );
}
}
-
+
public Message receiveNoWait(Destination destination) throws JMSException {
Session session = borrowSession();
try {
@@ -287,7 +292,7 @@
returnSession( session );
}
}
-
+
public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
Session session = borrowSession();
try {
@@ -297,7 +302,7 @@
returnSession( session );
}
}
-
+
public void run() {
// don't return sessions which throw an exception
try {
@@ -309,11 +314,11 @@
// ### ignore
}
}
-
+
public ConnectionConsumer createConnectionConsumer(Destination destination, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
return createConnectionConsumer(destination, null, sessionPool, maxMessages);
}
-
+
public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
Connection connection = getConnection();
if ( connection instanceof TopicConnection ) {
@@ -332,10 +337,10 @@
}
public abstract Connection getConnection() throws JMSException;
-
+
// Listener API
- //-------------------------------------------------------------------------
-
+ //-------------------------------------------------------------------------
+
public void addListener(Destination destination, MessageListener listener) throws JMSException {
if ( listener instanceof MessengerListener ) {
MessengerListener messengerListener = (MessengerListener) listener;
@@ -345,7 +350,7 @@
try {
MessageConsumer consumer = createMessageConsumer( session, destination );
consumer.setMessageListener( listener );
-
+
ListenerKey key = new ListenerKey( destination, listener );
listeners.put( key, consumer );
}
@@ -353,7 +358,7 @@
returnListenerSession( session );
}
}
-
+
public void addListener(Destination destination, String selector, MessageListener listener) throws JMSException {
if ( listener instanceof MessengerListener ) {
MessengerListener messengerListener = (MessengerListener) listener;
@@ -363,7 +368,7 @@
try {
MessageConsumer consumer = createMessageConsumer( session, destination, selector );
consumer.setMessageListener( listener );
-
+
ListenerKey key = new ListenerKey( destination, listener, selector );
listeners.put( key, consumer );
}
@@ -371,7 +376,7 @@
returnListenerSession( session );
}
}
-
+
public void removeListener(Destination destination, MessageListener listener ) throws JMSException {
ListenerKey key = new ListenerKey( destination, listener );
@@ -391,10 +396,10 @@
consumer.close();
}
-
+
// Message factory methods
- //-------------------------------------------------------------------------
-
+ //-------------------------------------------------------------------------
+
public BytesMessage createBytesMessage() throws JMSException {
Session session = borrowSession();
try {
@@ -404,7 +409,7 @@
returnSession( session );
}
}
-
+
public MapMessage createMapMessage() throws JMSException {
Session session = borrowSession();
try {
@@ -414,8 +419,8 @@
returnSession( session );
}
}
-
- public Message createMessage() throws JMSException {
+
+ public Message createMessage() throws JMSException {
Session session = borrowSession();
try {
return session.createMessage();
@@ -424,7 +429,7 @@
returnSession( session );
}
}
-
+
public ObjectMessage createObjectMessage() throws JMSException {
Session session = borrowSession();
try {
@@ -434,7 +439,7 @@
returnSession( session );
}
}
-
+
public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
Session session = borrowSession();
try {
@@ -444,7 +449,7 @@
returnSession( session );
}
}
-
+
public StreamMessage createStreamMessage() throws JMSException {
Session session = borrowSession();
try {
@@ -454,7 +459,7 @@
returnSession( session );
}
}
-
+
public TextMessage createTextMessage() throws JMSException {
Session session = borrowSession();
try {
@@ -464,7 +469,7 @@
returnSession( session );
}
}
-
+
public TextMessage createTextMessage(String text) throws JMSException {
Session session = borrowSession();
try {
@@ -474,7 +479,7 @@
returnSession( session );
}
}
-
+
public void commit() throws JMSException {
Session session = borrowSession();
try {
@@ -484,7 +489,7 @@
returnSession( session );
}
}
-
+
public void rollback() throws JMSException {
Session session = borrowSession();
try {
@@ -494,39 +499,39 @@
returnSession( session );
}
}
-
+
public void close() throws JMSException {
getConnection().close();
}
-
+
// Properties
- //-------------------------------------------------------------------------
+ //-------------------------------------------------------------------------
/** Gets the name that this Messenger is called in a MessengerManager */
public String getName() {
return name;
}
-
+
/** Sets the name that this Messenger is called in a MessengerManager */
public void setName(String name) {
this.name = name;
}
-
+
/** Gets whether topic subscribers are durable or not */
public boolean isDurable() {
return noLocal;
}
-
+
/** Sets whether topic subscribers are durable or not */
public void setDurable(boolean durable) {
this.durable = durable;
}
-
+
/** Returns the durable name used for durable topic based subscriptions */
public String getDurableName() {
return durableName;
}
-
+
/** Sets the durable name used for durable topic based subscriptions */
public void setDurableName(String durableName) {
this.durableName = durableName;
@@ -537,20 +542,20 @@
public boolean isNoLocal() {
return noLocal;
}
-
+
/** Sets whether local messages are ignored when topic based subscription is used
* with a message selector */
public void setNoLocal(boolean noLocal) {
this.noLocal = noLocal;
}
-
+
// Implementation methods
- //-------------------------------------------------------------------------
+ //-------------------------------------------------------------------------
+
-
/** Borrows a session instance from the pool */
protected abstract Session borrowSession() throws JMSException;
-
+
/** Returns a session instance back to the pool */
protected abstract void returnSession(Session session) throws JMSException;
@@ -559,14 +564,14 @@
/** Borrows a session instance from the pool */
protected abstract Session borrowListenerSession() throws JMSException;
-
+
/** Returns a session instance back to the pool */
protected abstract void returnListenerSession(Session session) throws JMSException;
/** Returns a message producer for the given session and destination */
protected MessageProducer getMessageProducer( Session session, Destination destination ) throws JMSException {
return createMessageProducer( session, destination );
-/**
+/**
MessageProducer producer = (MessageProducer) producers.get( destination );
if ( producer == null ) {
producer = createMessageProducer( session, destination );
@@ -574,7 +579,7 @@
return producer;
*/
}
-
+
/** Returns a newly created message producer for the given session and destination */
protected MessageProducer createMessageProducer( Session session, Destination destination ) throws JMSException {
if ( session instanceof TopicSession ) {
@@ -590,7 +595,7 @@
/** Returns a MessageConsumer for the given session and destination */
protected MessageConsumer getMessageConsumer( Session session, Destination destination ) throws JMSException {
return createMessageConsumer( session, destination );
-/*
+/*
MessageConsumer consumer = (MessageConsumer) consumers.get( destination );
if ( consumer == null ) {
consumer = createMessageConsumer( session, destination );
@@ -598,25 +603,25 @@
return consumer;
*/
}
-
+
/** Returns a MessageConsumer for the given session, destination and selector */
protected MessageConsumer getMessageConsumer( Session session, Destination destination, String selector ) throws JMSException {
// XXXX: could do caching one day
return createMessageConsumer( session, destination, selector );
}
-
+
/** Returns a new MessageConsumer for the given session and destination */
protected MessageConsumer createMessageConsumer( Session session, Destination destination ) throws JMSException {
if ( session instanceof TopicSession ) {
TopicSession topicSession = (TopicSession) session;
if ( isDurable() ) {
- return topicSession.createDurableSubscriber(
- (Topic) destination,
- getDurableName()
+ return topicSession.createDurableSubscriber(
+ (Topic) destination,
+ getDurableName()
);
}
else {
- return topicSession.createSubscriber(
+ return topicSession.createSubscriber(
(Topic) destination
);
}
@@ -626,23 +631,23 @@
return queueSession.createReceiver( (Queue) destination );
}
}
-
+
/** Returns a new MessageConsumer for the given session, destination and selector */
protected MessageConsumer createMessageConsumer( Session session, Destination destination, String selector ) throws JMSException {
if ( session instanceof TopicSession ) {
TopicSession topicSession = (TopicSession) session;
if ( isDurable() ) {
- return topicSession.createDurableSubscriber(
- (Topic) destination,
- getDurableName(),
- selector,
- isNoLocal()
+ return topicSession.createDurableSubscriber(
+ (Topic) destination,
+ getDurableName(),
+ selector,
+ isNoLocal()
);
}
else {
- return topicSession.createSubscriber(
- (Topic) destination,
- selector,
+ return topicSession.createSubscriber(
+ (Topic) destination,
+ selector,
isNoLocal()
);
}
@@ -650,29 +655,29 @@
else {
QueueSession queueSession = (QueueSession) session;
return queueSession.createReceiver(
- (Queue) destination,
- selector
+ (Queue) destination,
+ selector
);
}
}
-
+
protected Queue getQueue(QueueSession session, String subject) throws JMSException {
// XXXX: might want to cache
return session.createQueue( subject );
}
-
+
protected Topic getTopic(TopicSession session, String subject) throws JMSException {
// XXXX: might want to cache
return session.createTopic( subject );
}
-
+
protected Destination getReplyToDestination() throws JMSException {
if ( replyToDestination == null ) {
replyToDestination = createTemporaryDestination();
}
return replyToDestination;
}
-
+
protected TopicRequestor getTopicRequestor( TopicSession session, Topic destination ) throws JMSException {
if ( CACHE_REQUESTOR ) {
Map requestors = (Map) requestorsMap.get();
@@ -687,7 +692,7 @@
return new TopicRequestor( session, destination );
}
}
-
+
protected QueueRequestor getQueueRequestor( QueueSession session, Queue destination ) throws JMSException {
if ( CACHE_REQUESTOR ) {
Map requestors = (Map) requestorsMap.get();
1.1 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/InitMessengerServlet.java
Index: InitMessengerServlet.java
===================================================================
/*
* Copyright (C) The Apache Software Foundation. All rights reserved.
*
* This software is published under the terms of the Apache Software License
* version 1.1, a copy of which has been included with this distribution in
* the LICENSE file.
*
* $Id: InitMessengerServlet.java,v 1.1 2002/02/01 14:32:33 jstrachan Exp $
*/
package org.apache.commons.messenger;
import java.net.URL;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
/** <p><code>InitMessengerServlet</code> is a simple servlet that
* will initialize the MessengerManager from a URL specified in the
* web.xml deployment descriptor.</p>
*
* @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
* @version $Revision: 1.1 $
*/
public class InitMessengerServlet extends HttpServlet {
private static boolean initialized = false;
public InitMessengerServlet() {
}
public void init() throws ServletException {
if ( ! initialized ) {
initialized = true;
//System.out.println( "About to initialize MessengerManager" );
getServletContext().log( "About to initialize MessengerManager" );
String config = getRequiredInitParmeter( "config", "The URL of the Messenger XML deployment document" );
try {
URL url = getServletContext().getResource( config );
MessengerManager.configure( url.toString() );
}
catch (Exception e) {
throw new ServletException( "Failed to initialise MessengerManager from config: " + config + ". Reason : " + e, e );
}
}
}
// Implementation methods
//-------------------------------------------------------------------------
protected String getRequiredInitParmeter(String key, String description) throws ServletException {
String value = getInitParameter( key );
if ( value == null || value.length() == 0 ) {
throw new ServletException(
"No initialization parameter for parameter: " + key
+ " description: " + description
);
}
return value;
}
}
--
To unsubscribe, e-mail: <ma...@jakarta.apache.org>
For additional commands, e-mail: <ma...@jakarta.apache.org>