You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@cocoon.apache.org by un...@apache.org on 2005/06/17 12:42:13 UTC

svn commit: r191126 - in /cocoon: blocks/jms/trunk/WEB-INF/xconf/ blocks/jms/trunk/java/org/apache/cocoon/components/jms/ trunk/

Author: unico
Date: Fri Jun 17 03:42:11 2005
New Revision: 191126

URL: http://svn.apache.org/viewcvs?rev=191126&view=rev
Log:
JMS block: Connection failures can now be recovered from. The default JMSConnectionManager implementation
detects when JMS connections are severed and schedules reconnection attempts with the cron scheduler
in order to re-establish the connection. As soon as reconnection was successful it notifies its listeners
so that they may recover as well and refresh their JMS sessions. The provided abstract MessageListener
and JMS publisher that users are recommended to use as base classes for their concrete JMS needs have been
updated to use this mechanism.

Modified:
    cocoon/blocks/jms/trunk/WEB-INF/xconf/cocoon-jms.xconf
    cocoon/blocks/jms/trunk/java/org/apache/cocoon/components/jms/AbstractMessageListener.java
    cocoon/blocks/jms/trunk/java/org/apache/cocoon/components/jms/AbstractMessagePublisher.java
    cocoon/blocks/jms/trunk/java/org/apache/cocoon/components/jms/JMSConnectionManagerImpl.java
    cocoon/trunk/gump.xml
    cocoon/trunk/status.xml

Modified: cocoon/blocks/jms/trunk/WEB-INF/xconf/cocoon-jms.xconf
URL: http://svn.apache.org/viewcvs/cocoon/blocks/jms/trunk/WEB-INF/xconf/cocoon-jms.xconf?rev=191126&r1=191125&r2=191126&view=diff
==============================================================================
--- cocoon/blocks/jms/trunk/WEB-INF/xconf/cocoon-jms.xconf (original)
+++ cocoon/blocks/jms/trunk/WEB-INF/xconf/cocoon-jms.xconf Fri Jun 17 03:42:11 2005
@@ -40,6 +40,10 @@
     |   equivalent to "java.naming.factory.initial".
     | - connection-factory: the JNDI lookup name of the javax.jms.ConnectionFactory service.
     | - username / password: optional connection credentials
+    | - auto-reconnect: when the JMS connection fails when this parameter is set to true
+    |   a cron job will be scheduled to attempt to re-establish the connection. Default is false.
+    | - auto-reconnect-delay: the amount of time in ms between reconnection attempts. 
+    |   Only applies when auto-reconnect is on. Defaults to 1000ms.
     + -->
   <jms-connection-manager logger="core.jms">
     <topic-connection name="local-topics">
@@ -50,6 +54,8 @@
       <!--
       <parameter name="username" value="user"/>
       <parameter name="password" value="secret"/>
+      <parameter name="auto-reconnect" value="false"/>
+      <parameter name="auto-reconnect-delay" value="1000"/>
       -->
     </topic-connection>
   </jms-connection-manager>

Modified: cocoon/blocks/jms/trunk/java/org/apache/cocoon/components/jms/AbstractMessageListener.java
URL: http://svn.apache.org/viewcvs/cocoon/blocks/jms/trunk/java/org/apache/cocoon/components/jms/AbstractMessageListener.java?rev=191126&r1=191125&r2=191126&view=diff
==============================================================================
--- cocoon/blocks/jms/trunk/java/org/apache/cocoon/components/jms/AbstractMessageListener.java (original)
+++ cocoon/blocks/jms/trunk/java/org/apache/cocoon/components/jms/AbstractMessageListener.java Fri Jun 17 03:42:11 2005
@@ -15,7 +15,6 @@
  */
 package org.apache.cocoon.components.jms;
 
-import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.MessageListener;
 import javax.jms.Session;
@@ -23,7 +22,6 @@
 import javax.jms.TopicConnection;
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
-
 import org.apache.avalon.framework.activity.Disposable;
 import org.apache.avalon.framework.activity.Initializable;
 import org.apache.avalon.framework.logger.AbstractLogEnabled;
@@ -35,8 +33,10 @@
 import org.apache.avalon.framework.service.Serviceable;
 
 /**
- * Abstract JMS MessageListener. Use this as a basis for concrete
- * MessageListener implementations.
+ * Abstract {@link javax.jms.MessageListener} implementation. 
+ * Use this as a basis for concrete MessageListener implementations. 
+ * When used in conjunction with the default {@link org.apache.cocoon.components.jms.JMSConnectionManager} 
+ * implementation this class supports automatic reconnection when the connection gets severed.
  * 
  * <p>Parameters:</p>
  * <table border="1">
@@ -73,10 +73,11 @@
  *  </tbody>
  * </table>
  * 
- * @version CVS $Id: AbstractMessageListener.java,v 1.1 2004/07/01 13:43:00 unico Exp $
+ * @version CVS $Id$
  */
 public abstract class AbstractMessageListener extends AbstractLogEnabled
-implements MessageListener, ExceptionListener, Serviceable, Parameterizable, Initializable, Disposable {
+implements MessageListener, Serviceable, Parameterizable, Initializable, Disposable,
+           JMSConnectionEventListener {
 
     // ---------------------------------------------------- Constants
 
@@ -98,7 +99,7 @@
     protected int m_acknowledgeMode;
 
     /* connection manager component */
-    private JMSConnectionManager m_jmsConnectionManager;
+    private JMSConnectionManager m_connectionManager;
 
     /* our session */
     private TopicSession m_session;
@@ -113,7 +114,7 @@
 
     public void service(ServiceManager manager) throws ServiceException {
         m_manager = manager;
-        m_jmsConnectionManager = (JMSConnectionManager) m_manager.lookup(JMSConnectionManager.ROLE);
+        m_connectionManager = (JMSConnectionManager) m_manager.lookup(JMSConnectionManager.ROLE);
     }
 
     public void parameterize(Parameters parameters) throws ParameterException {
@@ -128,15 +129,48 @@
 
     /**
      * Registers this MessageListener as a TopicSubscriber to the configured Topic.
+     * @throws Exception
      */
     public void initialize() throws Exception {
+        if (m_connectionManager instanceof JMSConnectionEventNotifier) {
+            ((JMSConnectionEventNotifier) m_connectionManager).addConnectionListener(m_connectionName, this);
+        }
+        createSessionAndSubscriber();
+    }
+
+    public void dispose() {
+        closeSubscriberAndSession();
+        m_manager.release(m_connectionManager);
+    }
 
+    public void onConnection(String name) {
+        if (getLogger().isInfoEnabled()) {
+            getLogger().info("Creating subscriber because of reconnection");
+        }
+        try {
+            createSessionAndSubscriber();
+        }
+        catch (JMSException e) {
+            if (getLogger().isWarnEnabled()) {
+                getLogger().warn("Reinitialization after reconnection failed", e);
+            }
+        }
+    }
+
+    public void onDisconnection(String name) {
+        if (getLogger().isInfoEnabled()) {
+            getLogger().info("Closing subscriber because of disconnection");
+        }
+        closeSubscriberAndSession();
+    }
+
+    private void createSessionAndSubscriber() throws JMSException {
         // set the default acknowledge mode to dups
         // concrete implementations may want to override this
         m_acknowledgeMode = Session.DUPS_OK_ACKNOWLEDGE;
 
         // register this MessageListener with a TopicSubscriber
-        final TopicConnection connection = (TopicConnection) m_jmsConnectionManager.getConnection(m_connectionName);
+        final TopicConnection connection = (TopicConnection) m_connectionManager.getConnection(m_connectionName);
         if (connection != null) {
             m_session = connection.createTopicSession(false, m_acknowledgeMode);
             final Topic topic = m_session.createTopic(m_topicName);
@@ -153,16 +187,18 @@
                 getLogger().warn("Could not obtain JMS connection '" + m_connectionName + "'");
             }
         }
-
     }
 
-    public void dispose() {
+    private void closeSubscriberAndSession() {
         if (m_subscriber != null) {
             try {
                 m_subscriber.close();
             } catch (JMSException e) {
                 getLogger().error("Error closing subscriber", e);
             }
+            finally {
+                m_subscriber = null;
+            }
         }
         if (m_session != null) {
             try {
@@ -171,13 +207,9 @@
             catch (JMSException e) {
                 getLogger().error("Error closing session", e);
             }
-        }
-        this.m_manager.release(m_jmsConnectionManager);
-    }
-
-    public void onException(JMSException exception) {
-        if (getLogger().isWarnEnabled()) {
-            getLogger().warn("JMS problem detected", exception);
+            finally {
+                m_session = null;
+            }
         }
     }
 }

Modified: cocoon/blocks/jms/trunk/java/org/apache/cocoon/components/jms/AbstractMessagePublisher.java
URL: http://svn.apache.org/viewcvs/cocoon/blocks/jms/trunk/java/org/apache/cocoon/components/jms/AbstractMessagePublisher.java?rev=191126&r1=191125&r2=191126&view=diff
==============================================================================
--- cocoon/blocks/jms/trunk/java/org/apache/cocoon/components/jms/AbstractMessagePublisher.java (original)
+++ cocoon/blocks/jms/trunk/java/org/apache/cocoon/components/jms/AbstractMessagePublisher.java Fri Jun 17 03:42:11 2005
@@ -37,6 +37,8 @@
 /**
  * Abstract JMS message publisher. Use this as a basis for components 
  * that want to publish JMS messages.
+ * When used in conjunction with the default {@link org.apache.cocoon.components.jms.JMSConnectionManager} 
+ * implementation this class supports automatic reconnection when the connection gets severed.
  * 
  * <p>Parameters:</p>
  * <table border="1">
@@ -84,10 +86,10 @@
  *  </tbody>
  * </table>
  * 
- * @version CVS $Id: AbstractMessagePublisher.java,v 1.1 2004/07/01 13:43:00 unico Exp $
+ * @version CVS $Id$
  */
 public abstract class AbstractMessagePublisher extends AbstractLogEnabled
-implements Serviceable, Parameterizable, Initializable, Disposable {
+implements Serviceable, Parameterizable, Initializable, Disposable, JMSConnectionEventListener {
 
     // ---------------------------------------------------- Constants
 
@@ -136,7 +138,58 @@
     }
 
     public void initialize() throws Exception {
+        if (m_connectionManager instanceof JMSConnectionEventNotifier) {
+            ((JMSConnectionEventNotifier) m_connectionManager).addConnectionListener(m_connectionName, this);
+        }
+        createSessionAndPublisher();
+    }
+
+    public void dispose() {
+        closePublisherAndSession();
+        if (m_manager != null) {
+            if (m_connectionManager != null) {
+                m_manager.release(m_connectionManager);
+            }
+        }
+    }
+
+    // ---------------------------------------------------- JMSConnectionEventListener
+
+    public void onConnection(String name) {
+        if (getLogger().isInfoEnabled()) {
+            getLogger().info("Creating publisher because of reconnection");
+        }
+        try {
+            createSessionAndPublisher();
+        }
+        catch (JMSException e) {
+            if (getLogger().isWarnEnabled()) {
+                getLogger().warn("Reinitialization after reconnection failed", e);
+            }
+        }
+    }
+
+    public void onDisconnection(String name) {
+        if (getLogger().isInfoEnabled()) {
+            getLogger().info("Closing subscriber because of disconnection");
+        }
+        closePublisherAndSession();
+    }
+
+    // ---------------------------------------------------- Implementation
+
+    /**
+     * Concrete classes call this method to publish messages.
+     */
+    protected synchronized void publishMessage(Message message) throws JMSException {
+        // TODO: discover disconnected state and queue messages until connected.
+        if (getLogger().isDebugEnabled()) {
+            getLogger().debug("Publishing message '" + message + "'");
+        }
+        m_publisher.publish(message, m_mode, m_priority, m_timeToLive);
+    }
 
+    private void createSessionAndPublisher() throws JMSException {
         // set the default acknowledge mode
         // concrete implementations may override this
         m_acknowledgeMode = Session.DUPS_OK_ACKNOWLEDGE;
@@ -153,10 +206,9 @@
                 getLogger().warn("Could not obtain JMS connection '" + m_connectionName + "'");
             }
         }
-
     }
 
-    public void dispose() {
+    private void closePublisherAndSession() {
         if (m_publisher != null) {
             try {
                 m_publisher.close();
@@ -172,23 +224,6 @@
                 getLogger().warn("Error closing session.", e);
             }
         }
-        if (m_manager != null) {
-            if (m_connectionManager != null) {
-                m_manager.release(m_connectionManager);
-            }
-        }
-    }
-
-    // ---------------------------------------------------- Implementation
-
-    /**
-     * Concrete classes call this method to publish messages.
-     */
-    protected synchronized void publishMessage(Message message) throws JMSException {
-        if (getLogger().isDebugEnabled()) {
-            getLogger().debug("Publishing message '" + message + "'");
-        }
-        m_publisher.publish(message, m_mode, m_priority, m_timeToLive);
     }
 
 }

Modified: cocoon/blocks/jms/trunk/java/org/apache/cocoon/components/jms/JMSConnectionManagerImpl.java
URL: http://svn.apache.org/viewcvs/cocoon/blocks/jms/trunk/java/org/apache/cocoon/components/jms/JMSConnectionManagerImpl.java?rev=191126&r1=191125&r2=191126&view=diff
==============================================================================
--- cocoon/blocks/jms/trunk/java/org/apache/cocoon/components/jms/JMSConnectionManagerImpl.java (original)
+++ cocoon/blocks/jms/trunk/java/org/apache/cocoon/components/jms/JMSConnectionManagerImpl.java Fri Jun 17 03:42:11 2005
@@ -15,13 +15,16 @@
  */
 package org.apache.cocoon.components.jms;
 
+import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
-
+import java.util.Set;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.QueueConnection;
 import javax.jms.QueueConnectionFactory;
@@ -29,9 +32,7 @@
 import javax.jms.TopicConnectionFactory;
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
-
-import org.apache.cocoon.components.jms.JMSConnectionManager;
-
+import org.apache.avalon.framework.CascadingException;
 import org.apache.avalon.framework.activity.Disposable;
 import org.apache.avalon.framework.activity.Initializable;
 import org.apache.avalon.framework.activity.Startable;
@@ -39,15 +40,22 @@
 import org.apache.avalon.framework.configuration.Configuration;
 import org.apache.avalon.framework.configuration.ConfigurationException;
 import org.apache.avalon.framework.logger.AbstractLogEnabled;
+import org.apache.avalon.framework.logger.Logger;
 import org.apache.avalon.framework.parameters.ParameterException;
 import org.apache.avalon.framework.parameters.Parameters;
+import org.apache.avalon.framework.service.ServiceException;
+import org.apache.avalon.framework.service.ServiceManager;
+import org.apache.avalon.framework.service.Serviceable;
 import org.apache.avalon.framework.thread.ThreadSafe;
+import org.apache.cocoon.components.cron.CronJob;
+import org.apache.cocoon.components.cron.JobScheduler;
 
 /**
  * {@link org.apache.cocoon.components.jms.JMSConnectionManager} implementation.
  */
 public class JMSConnectionManagerImpl extends AbstractLogEnabled 
-implements JMSConnectionManager, Configurable, Initializable, Startable, Disposable, ThreadSafe {
+implements JMSConnectionManager, Serviceable, Configurable, Initializable,
+           Startable, Disposable, ThreadSafe, JMSConnectionEventNotifier {
 
     // ---------------------------------------------------- Constants
     
@@ -63,18 +71,29 @@
     private static final String CONNECTION_FACTORY_PARAM = "connection-factory";
     private static final String USERNAME_PARAM = "username";
     private static final String PASSWORD_PARAM = "password";
+    private static final String AUTO_RECONNECT_PARAM = "auto-reconnect";
+    private static final String AUTO_RECONNECT_DELAY_PARAM = "auto-reconnect-delay";
+    
+    private static final int DEFAULT_AUTO_RECONNECT_DELAY = 1000;
     
     private static final String JNDI_PROPERTY_PREFIX = "java.naming.";
 
     // ---------------------------------------------------- Instance variables
 
+    private ServiceManager m_serviceManager;
+    
     private Map m_configurations;
     private Map m_connections;
+    private Map m_listeners;
 
     // ---------------------------------------------------- Lifecycle
 
     public JMSConnectionManagerImpl() {
     }
+    
+    public void service(ServiceManager manager) {
+        m_serviceManager = manager;
+    }
 
     public void configure(Configuration configuration) throws ConfigurationException {
         m_configurations = new HashMap(configuration.getChildren().length);
@@ -103,39 +122,19 @@
     }
 
     public void initialize() throws Exception {
+        m_listeners = new HashMap();
         m_connections = new HashMap(m_configurations.size());
         final Iterator iter = m_configurations.values().iterator();
-        try {
-            while (iter.hasNext()) {
-                final ConnectionConfiguration cc = (ConnectionConfiguration) iter.next();
-                final InitialContext context = createInitialContext(cc.getJNDIProperties());
-                final ConnectionFactory factory = (ConnectionFactory) context.lookup(cc.getConnectionFactory());
-                final Connection connection = createConnection(factory, cc);
+
+        while (iter.hasNext()) {
+            final ConnectionConfiguration cc = (ConnectionConfiguration) iter.next();
+            try {
+                final Connection connection = createConnection(cc);
+                
                 m_connections.put(cc.getName(), connection);
             }
-        }
-        catch (NamingException e) {
-            if (getLogger().isWarnEnabled()) {
-                Throwable rootCause = e.getRootCause();
-                if (rootCause != null) {
-                    String message = e.getRootCause().getMessage();
-                    if (rootCause instanceof ClassNotFoundException) {
-                        String info = "WARN! *** JMS block is installed but jms client library not found. ***\n" + 
-                            "- For the jms block to work you must install and start a JMS server and " +
-                            "place the client jar in WEB-INF/lib.";
-                            if (message.indexOf("exolab") > 0 ) {
-                                info += "\n- The default server, OpenJMS is configured in cocoon.xconf but is not bundled with Cocoon.";
-                            }
-                        System.err.println(info);
-                        getLogger().warn(info,e);
-                    } else {
-                        System.out.println(message);
-                        getLogger().warn("Cannot get Initial Context. Is the JNDI server reachable?",e);
-                    }
-                }
-                else {
-                    getLogger().warn("Failed to initialize JMS.",e);
-                }
+            catch (NamingException e) {
+                // ignore, warnings for NamingExceptions are logged by createConnection method
             }
         }
         m_configurations = null;
@@ -157,17 +156,20 @@
         final Iterator iter = m_connections.entrySet().iterator();
         while (iter.hasNext()) {
             final Map.Entry entry = (Map.Entry) iter.next();
-            if (getLogger().isDebugEnabled()) {
-                getLogger().debug("Stopping JMS connection " + entry.getKey());
-            }
-            try {
-                final Connection connection = (Connection) entry.getValue();
-                connection.stop();
-            }
-            catch (JMSException e) {
-                getLogger().error("Error stopping JMS connection " + entry.getKey(), e);
-            }
+            stopConnection((String) entry.getKey(), (Connection) entry.getValue());
+        }
+    }
+
+    void stopConnection(String name, Connection connection) {
+        if (getLogger().isDebugEnabled()) {
+            getLogger().debug("Stopping JMS connection " + name);
+        }
+        try {
+            connection.stop();
         }
+        catch (JMSException e) {
+            // ignore
+        }        
     }
 
     public void dispose() {
@@ -189,25 +191,73 @@
 
     // ---------------------------------------------------- ConnectionManager
 
-    public Connection getConnection(String name) {
+    public synchronized Connection getConnection(String name) {
         return (Connection) m_connections.get(name);
     }
 
-    public TopicConnection getTopicConnection(String name) {
+    public synchronized TopicConnection getTopicConnection(String name) {
         return (TopicConnection) m_connections.get(name);
     }
 
-    public QueueConnection getQueueConnection(String name) {
+    public synchronized QueueConnection getQueueConnection(String name) {
         return (QueueConnection) m_connections.get(name);
     }
 
+    // ---------------------------------------------------- JMSConnectionEventNotifier
+    
+    public synchronized void addConnectionListener(String name, JMSConnectionEventListener listener) {
+       Set connectionListeners = (Set) m_listeners.get(name);
+       if (connectionListeners == null) {
+           connectionListeners = new HashSet();
+           m_listeners.put(name, connectionListeners);
+       }
+       connectionListeners.add(listener);
+    }
+
+    public synchronized void removeConnectionListener(String name, JMSConnectionEventListener listener) {
+        Set connectionListeners = (Set) m_listeners.get(name);
+        if (connectionListeners != null) {
+            connectionListeners.remove(listener);
+        }
+     }
+
     // ---------------------------------------------------- Implementation
 
-    private InitialContext createInitialContext(Properties properties) throws NamingException {
-        if (properties != null) {
-            return new InitialContext(properties);
+    Connection createConnection(ConnectionConfiguration cc) throws NamingException, JMSException {
+        try {
+            final InitialContext context = createInitialContext(cc.getJNDIProperties());
+            final ConnectionFactory factory = (ConnectionFactory) context.lookup(cc.getConnectionFactory());
+            final Connection connection = createConnection(factory, cc);
+            if (cc.isAutoReconnect()) {
+                connection.setExceptionListener(new ReconnectionListener(this, cc));
+            }
+            return connection;
+        }
+        catch (NamingException e) {
+            if (getLogger().isWarnEnabled()) {
+                final Throwable rootCause = e.getRootCause();
+                if (rootCause != null) {
+                    String message = e.getRootCause().getMessage();
+                    if (rootCause instanceof ClassNotFoundException) {
+                        String info = "WARN! *** JMS block is installed but jms client library not found. ***\n" + 
+                            "- For the jms block to work you must install and start a JMS server and " +
+                            "place the client jar in WEB-INF/lib.";
+                            if (message.indexOf("exolab") > 0 ) {
+                                info += "\n- The default server, OpenJMS is configured in cocoon.xconf but is not bundled with Cocoon.";
+                            }
+                        System.err.println(info);
+                        getLogger().warn(info,e);
+                    } else {
+                        System.out.println(message);
+                        getLogger().warn("Cannot get Initial Context. Is the JNDI server reachable?",e);
+                    }
+                }
+                else {
+                    getLogger().warn("Failed to initialize JMS.",e);
+                }
+            }
+            throw e;
         }
-        return new InitialContext();
     }
 
     private Connection createConnection(ConnectionFactory factory, ConnectionConfiguration cc) throws JMSException {
@@ -242,7 +292,73 @@
         return null;
     }
 
-    private static final class ConnectionConfiguration {
+    private InitialContext createInitialContext(Properties properties) throws NamingException {
+        if (properties != null) {
+            return new InitialContext(properties);
+        }
+        return new InitialContext();
+    }
+    
+    synchronized void removeConnection(String name) {
+        notifyListenersOfDisconnection(name);
+        final Connection connection = (Connection) m_connections.remove(name);
+        stopConnection(name, connection);
+    }
+    
+    synchronized void addConnection(String name, Connection connection) {
+        m_connections.put(name, connection);
+        notifyListenersOfConnection(name);
+    }
+    
+    void scheduleReconnectionJob(ConnectionConfiguration configuration) {
+        if (getLogger().isInfoEnabled()) {
+            getLogger().info("Scheduling JMS reconnection job for: " + configuration.getName());
+        }
+        JobScheduler scheduler = null;
+        try {
+            scheduler = (JobScheduler) m_serviceManager.lookup(JobScheduler.ROLE);
+            Date executionTime = new Date(System.currentTimeMillis() + configuration.getAutoReconnectDelay());
+            ReconnectionJob job = new ReconnectionJob(this, configuration);
+            scheduler.fireJobAt(executionTime, "reconnect_" + configuration.getName(), job);
+        }
+        catch (ServiceException e) {
+            if (getLogger().isWarnEnabled()) {
+                getLogger().warn("Cannot obtain scheduler.",e);
+            }
+        }
+        catch (CascadingException e) {
+            if (getLogger().isWarnEnabled()) {
+                getLogger().warn("Unable to schedule reconnection job.",e);
+            }
+        }
+        finally {
+            if (scheduler != null) {
+                m_serviceManager.release(scheduler);
+            }
+        }
+    }
+    
+    private void notifyListenersOfConnection(String name) {
+        Set connectionListeners = (Set) m_listeners.get(name);
+        if (connectionListeners != null) {
+            for (Iterator listenersIterator = connectionListeners.iterator(); listenersIterator.hasNext();) {
+                JMSConnectionEventListener listener = (JMSConnectionEventListener) listenersIterator.next();
+                listener.onConnection(name);
+            }
+        }
+    }
+    
+    private void notifyListenersOfDisconnection(String name) {
+        Set connectionListeners = (Set) m_listeners.get(name);
+        if (connectionListeners != null) {
+            for (Iterator listenersIterator = connectionListeners.iterator(); listenersIterator.hasNext();) {
+                JMSConnectionEventListener listener = (JMSConnectionEventListener) listenersIterator.next();
+                listener.onDisconnection(name);
+            }
+        }
+    }
+
+    static final class ConnectionConfiguration {
         
         // ------------------------------------------------ Instance variables
 
@@ -251,27 +367,28 @@
         private final String m_connectionFactory;
         private final String m_username;
         private final String m_password;
-        private Properties m_jndiProperties;
+        private final boolean m_autoReconnect;
+        private final int m_autoReconnectDelay;
+        
+        private Properties m_jndiProperties = new Properties();
 
-        private ConnectionConfiguration(String name, Parameters parameters, int type) 
+        ConnectionConfiguration(String name, Parameters parameters, int type) 
         throws ConfigurationException {
             m_name = name;
             try {
                 m_connectionFactory = parameters.getParameter(CONNECTION_FACTORY_PARAM);
                 m_username = parameters.getParameter(USERNAME_PARAM, null);
                 m_password = parameters.getParameter(PASSWORD_PARAM, null);
+                m_autoReconnect = parameters.getParameterAsBoolean(AUTO_RECONNECT_PARAM, false);
+                m_autoReconnectDelay = parameters.getParameterAsInteger(AUTO_RECONNECT_DELAY_PARAM, DEFAULT_AUTO_RECONNECT_DELAY);
                 
                 // parse the jndi property parameters
                 String[] names = parameters.getNames();
                 for (int i = 0; i < names.length; i++) {
                     if (names[i].startsWith(JNDI_PROPERTY_PREFIX)) {
-                        if (m_jndiProperties == null) {
-                            m_jndiProperties = new Properties();
-                        }
                         m_jndiProperties.put(names[i], parameters.getParameter(names[i]));
                     }
                 }
-                
             }
             catch (ParameterException e) {
                 throw new ConfigurationException(e.getLocalizedMessage());
@@ -279,34 +396,98 @@
             m_type = type;
         }
 
-        private String getName() {
+        String getName() {
             return m_name;
         }
 
-        private int getType() {
+        int getType() {
             return m_type;
         }
 
-        private Properties getJNDIProperties() {
+        Properties getJNDIProperties() {
             return m_jndiProperties;
         }
 
-        private String getConnectionFactory() {
+        String getConnectionFactory() {
             return m_connectionFactory;
         }
 
-        private String getUserName() {
+        String getUserName() {
             return m_username;
         }
 
-        private String getPassword() {
+        String getPassword() {
             return m_password;
         }
+        
+        boolean isAutoReconnect() {
+            return m_autoReconnect;
+        }
+        
+        int getAutoReconnectDelay() {
+            return m_autoReconnectDelay;
+        }
 
         public int hashCode() {
             return m_name.hashCode();
         }
 
+    }
+
+    static final class ReconnectionListener implements ExceptionListener {
+
+        private final JMSConnectionManagerImpl m_manager;
+        private final ConnectionConfiguration m_configuration;
+        
+        ReconnectionListener(JMSConnectionManagerImpl manager, ConnectionConfiguration configuration) {
+            super();
+            m_manager = manager;
+            m_configuration = configuration;
+        }
+
+        public void onException(JMSException exception) {
+            m_manager.removeConnection(m_configuration.getName());
+            m_manager.scheduleReconnectionJob(m_configuration);
+        }
+
+    }
+
+    static final class ReconnectionJob implements CronJob {
+
+        private final JMSConnectionManagerImpl m_manager;        
+        private final ConnectionConfiguration m_configuration;
+
+        ReconnectionJob(JMSConnectionManagerImpl manager, ConnectionConfiguration configuration) {
+            super();
+            m_manager = manager;
+            m_configuration = configuration;
+        }
+        
+        public void execute(String jobname) {
+            final Logger logger = m_manager.getLogger();
+            if (logger.isInfoEnabled()) {
+                logger.info("Reconnecting JMS connection: " + m_configuration.getName());
+            }
+            try {
+                final Connection connection = m_manager.createConnection(m_configuration);
+                m_manager.addConnection(m_configuration.getName(), connection);
+                if (logger.isInfoEnabled()) {
+                    logger.info("Successfully reconnected JMS connection: " + m_configuration.getName());
+                }
+            }
+            catch (NamingException e) {
+                if (logger.isWarnEnabled()) {
+                    logger.warn("Failed to reconnect.",e);
+                }
+                m_manager.scheduleReconnectionJob(m_configuration);
+            }
+            catch (JMSException e) {
+                if (logger.isWarnEnabled()) {
+                    logger.warn("Failed to reconnect.",e);
+                }
+                m_manager.scheduleReconnectionJob(m_configuration);
+            }
+        }
     }
 
 }

Modified: cocoon/trunk/gump.xml
URL: http://svn.apache.org/viewcvs/cocoon/trunk/gump.xml?rev=191126&r1=191125&r2=191126&view=diff
==============================================================================
--- cocoon/trunk/gump.xml (original)
+++ cocoon/trunk/gump.xml Fri Jun 17 03:42:11 2005
@@ -1173,6 +1173,7 @@
     <depend project="cocoon" inherit="all"/>
     <!-- commented out because of circular dependency -->
     <!--depend project="cocoon-block-eventcache" type="samples"/-->
+    <depend project="cocoon-block-cron"/>
     <depend project="cocoon-block-databases" type="samples"/>
     <depend project="cocoon-block-hsqldb" inherit="all" type="samples"/>
     <depend project="jms"/>

Modified: cocoon/trunk/status.xml
URL: http://svn.apache.org/viewcvs/cocoon/trunk/status.xml?rev=191126&r1=191125&r2=191126&view=diff
==============================================================================
--- cocoon/trunk/status.xml (original)
+++ cocoon/trunk/status.xml Fri Jun 17 03:42:11 2005
@@ -197,6 +197,14 @@
 
   <changes>
   <release version="@version@" date="@date@">
+    <action dev="UH" type="add" due-to="Johan Stuyts">
+      JMS block: Connection failures can now be recovered from. The default JMSConnectionManager implementation
+      detects when JMS connections are severed and schedules reconnection attempts with the cron scheduler
+      in order to re-establish the connection. As soon as reconnection was successful it notifies its listeners
+      so that they may recover as well and refresh their JMS sessions. The provided abstract MessageListener
+      and JMS publisher that users are recommended to use as base classes for their concrete JMS needs have been
+      updated to use this mechanism.
+    </action>
     <action dev="SW" type="add">
       CForms block: new Tree widget, heavily inspired by Swing's JTree. Features Ajax, selection listeners,
       and a lightweight data model with two implementations: a generic one, and a source-based one to build