You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@commons.apache.org by js...@apache.org on 2002/11/08 15:54:26 UTC

cvs commit: jakarta-commons-sandbox/messenger project.xml

jstrachan    2002/11/08 06:54:26

  Modified:    messenger/src/java/org/apache/commons/messenger
                        MessengerDigester.java DefaultMessenger.java
                        Messenger.java
               messenger/src/java/org/apache/commons/messagelet
                        ConsumerThread.java
               messenger project.xml
  Added:       messenger/src/java/org/apache/commons/messenger
                        XAMessenger.java XACapableAdapter.java
                        XACapable.java
               messenger/src/java/org/apache/commons/messagelet
                        XAConsumerThread.java
  Log:
  Added support for XA message consumption.
  
  Revision  Changes    Path
  1.5       +42 -23    jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerDigester.java
  
  Index: MessengerDigester.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerDigester.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- MessengerDigester.java	13 Nov 2001 12:46:10 -0000	1.4
  +++ MessengerDigester.java	8 Nov 2002 14:54:25 -0000	1.5
  @@ -22,6 +22,7 @@
       // default implementation classes
       private String messengerManagerClass = "org.apache.commons.messenger.MessengerManager";
       private String messengerClass = "org.apache.commons.messenger.DefaultMessenger";
  +    private String xaMessengerClass = "org.apache.commons.messenger.XAMessenger";
       private String sessionFactory = "org.apache.commons.messenger.SessionFactory";
       private String connectionFactory = "org.apache.commons.messenger.DummyConnectionFactory";
       private String jndiSessionFactory = "org.apache.commons.messenger.JNDISessionFactory";
  @@ -46,39 +47,57 @@
           
           addObjectCreate( "manager", messengerManagerClass, "className" );
           addSetProperties( "manager" );
  -    
  -        addObjectCreate( "manager/messenger", messengerClass, "className" );
  -        addSetProperties( "manager/messenger" );
  +
  +		String path = "manager/messenger";    
  +        addObjectCreate( path, messengerClass, "className" );
  +        addSetProperties( path );
           
  -        addSetNext( "manager/messenger", "addMessenger",
  +        addSetNext( path, "addMessenger",
              "org.apache.commons.messenger.Messenger"
           );        
  +
  +		addMessengerPaths(path);
  +		
  +		path = "manager/xaMessenger";    
  +        addObjectCreate( path, xaMessengerClass, "className" );
  +        addSetProperties( path );
           
  -        addObjectCreate( "manager/messenger/factory", sessionFactory, "className" );
  -        addSetProperties( "manager/messenger/factory" );
  -        addSetNext( "manager/messenger/factory", "setSessionFactory",
  +        addSetNext( path, "addMessenger",
  +           "org.apache.commons.messenger.Messenger"
  +        );        
  +
  +		addMessengerPaths(path);
  +    }
  +    
  +    protected void addMessengerPaths(String root) {
  +
  +		String path = root + "/factory";        
  +        addObjectCreate( path, sessionFactory, "className" );
  +        addSetProperties( path );
  +        addSetNext( path, "setSessionFactory",
              "org.apache.commons.messenger.SessionFactory"
           );
  -        addCallMethod( "manager/messenger/factory/property", "addProperty", 2);
  -        addCallParam( "manager/messenger/factory/property/name", 0 );
  -        addCallParam( "manager/messenger/factory/property/value", 1 );
  -        
  -        addObjectCreate( "manager/messenger/factory/connectionFactory", connectionFactory, "className" );
  -        addSetProperties( "manager/messenger/factory/connectionFactory" );
  -        addSetNext( "manager/messenger/factory/connectionFactory", "setConnectionFactory",
  +        addCallMethod( path + "/property", "addProperty", 2);
  +        addCallParam( path + "/property/name", 0 );
  +        addCallParam( path + "/property/value", 1 );
  +        
  +        path = root + "/factory/connectionFactory";       
  +        addObjectCreate( path, connectionFactory, "className" );
  +        addSetProperties( path );
  +        addSetNext( path, "setConnectionFactory",
              "javax.jms.ConnectionFactory"
           );
   
  +        path = root + "/jndi";        
  +        addObjectCreate( path, jndiSessionFactory, "className" );
  +        addSetProperties( path );
           
  -        addObjectCreate( "manager/messenger/jndi", jndiSessionFactory, "className" );
  -        addSetProperties( "manager/messenger/jndi" );
  -        
  -        addSetNext( "manager/messenger/jndi", "setSessionFactory",
  +        addSetNext( path, "setSessionFactory",
              "org.apache.commons.messenger.SessionFactory"
           );
                   
  -        addCallMethod( "manager/messenger/jndi/property", "addProperty", 2);
  -        addCallParam( "manager/messenger/jndi/property/name", 0 );
  -        addCallParam( "manager/messenger/jndi/property/value", 1 );
  +        addCallMethod( path + "/property", "addProperty", 2);
  +        addCallParam( path + "/property/name", 0 );
  +        addCallParam( path + "/property/value", 1 );
       }
   }
  
  
  
  1.13      +6 -2      jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/DefaultMessenger.java
  
  Index: DefaultMessenger.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/DefaultMessenger.java,v
  retrieving revision 1.12
  retrieving revision 1.13
  diff -u -r1.12 -r1.13
  --- DefaultMessenger.java	12 Jul 2002 07:41:16 -0000	1.12
  +++ DefaultMessenger.java	8 Nov 2002 14:54:25 -0000	1.13
  @@ -87,6 +87,10 @@
               messengerSessionPool = new ThreadLocal();
           }
       }
  +
  +    public Session getSession() throws JMSException {
  +        return getMessengerSession().getSession();
  +    }
       
       // Implementation methods
       //-------------------------------------------------------------------------
  
  
  
  1.14      +9 -2      jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/Messenger.java
  
  Index: Messenger.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/Messenger.java,v
  retrieving revision 1.13
  retrieving revision 1.14
  diff -u -r1.13 -r1.14
  --- Messenger.java	6 Sep 2002 11:53:00 -0000	1.13
  +++ Messenger.java	8 Nov 2002 14:54:25 -0000	1.14
  @@ -21,6 +21,7 @@
   import javax.jms.MessageListener;
   import javax.jms.ObjectMessage;
   import javax.jms.QueueBrowser;
  +import javax.jms.Session;
   import javax.jms.StreamMessage;
   import javax.jms.ServerSessionPool;
   import javax.jms.TextMessage;
  @@ -108,6 +109,12 @@
   
       /** Returns the underlying JMS connection that this Messenger is using */
       public Connection getConnection() throws JMSException;
  +
  +	/** 
  +	 * Returns the underlying JMS session that this thread is using for
  +	 * this Messenger for synchronous operation
  +	 */ 
    public Session getSession() throws JMSException;
  +
   
       /** Creates a ConnectionConsumer which is useful if used inside an application server
        * to associate multiple threads with consuming from a JMS destination */
  
  
  
  1.1                  jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/XAMessenger.java
  
  Index: XAMessenger.java
  ===================================================================
  /*
   * Copyright (C) The Apache Software Foundation. All rights reserved.
   *
   * This software is published under the terms of the Apache Software License
   * version 1.1, a copy of which has been included with this distribution in
   * the LICENSE file.
   *
   * $Id: DefaultMessenger.java,v 1.12 2002/07/12 07:41:16 jstrachan Exp $
   */
  package org.apache.commons.messenger;
  
  import javax.jms.Session;
  import javax.jms.XASession;
  import javax.transaction.Transaction;
  import javax.transaction.xa.XAResource;
  
  
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  
  /** <p><code>XAMessenger</code> is a default implementation of
   * Messenger which can also support XA transactions by enlisting and delisting
   * XAResources.
   * This is implemented as a seperate Messenger implementation to avoid the core
   * Messenger having a dependency on JTA.
   * .</p>
   *
   * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
   * @version $Revision: 1.12 $
   */
  public class XAMessenger extends DefaultMessenger implements XACapable {
      
      /** Logger */
      private static final Log log = LogFactory.getLog(XAMessenger.class);
      
      public XAMessenger() {
      }
      
      // XACapable interface
      //-------------------------------------------------------------------------
          
  	public void enlistResources(Transaction transaction) throws Exception {
  		XAResource resource = getXAResource();
  		if (resource != null) {
  			transaction.enlistResource(resource);
  		}
  	}
  
  	public void delistResources(Transaction transaction, int flag) throws Exception {
  		XAResource resource = getXAResource();
  		if (resource != null) {
  			transaction.delistResource(resource, flag);
  		}
  	}
  
      // Implementation methods
      //-------------------------------------------------------------------------
      
      /**
       * @return the XAResource associated with this Messenger if one exists
     */
  	protected XAResource getXAResource() throws Exception {
  		Session session = getMessengerSession().getSession();
  		if (session instanceof XASession) {
  			XASession xaSession = (XASession) session;
  			return xaSession.getXAResource();
  		}
  		return null;
  	}
  }
  
  
  1.1                  jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/XACapableAdapter.java
  
  Index: XACapableAdapter.java
  ===================================================================
  /*
   * Copyright (C) The Apache Software Foundation. All rights reserved.
   *
   * This software is published under the terms of the Apache Software License
   * version 1.1, a copy of which has been included with this distribution in
   * the LICENSE file.
   * 
   * $Id: MessageDrivenObject.java,v 1.2 2002/05/17 15:05:46 jstrachan Exp $
   */
  package org.apache.commons.messenger;
  
  import javax.jms.Session;
  import javax.jms.XASession;
  import javax.transaction.Transaction;
  import javax.transaction.xa.XAResource;
  
  /** 
   * <p><code>XACapableAdapter</code> is an adapter that implements
   * XACapable for a given Messenger 
   * </p>
   *
   * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
   * @version $Revision: 1.2 $
   */
  public class XACapableAdapter implements XACapable {
  
  	private Messenger messenger;
  	
  	public XACapableAdapter(Messenger messenger) {
  		this.messenger = messenger;
  	}
      
      // XACapable interface
      //-------------------------------------------------------------------------
          
  	public void enlistResources(Transaction transaction) throws Exception {
  		XAResource resource = getXAResource();
  		if (resource != null) {
  			transaction.enlistResource(resource);
  		}
  	}
  
  	public void delistResources(Transaction transaction, int flag) throws Exception {
  		XAResource resource = getXAResource();
  		if (resource != null) {
  			transaction.delistResource(resource, flag);
  		}
  	}
  
      // Implementation methods
      //-------------------------------------------------------------------------
      
      /**
       * @return the XAResource associated with this Messenger if one exists
       */
  	protected XAResource getXAResource() throws Exception {
  		Session session = messenger.getSession();
  		if (session instanceof XASession) {
  			XASession xaSession = (XASession) session;
  			return xaSession.getXAResource();
  		}
  		return null;
  	}
  }
  
  
  1.1                  jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/XACapable.java
  
  Index: XACapable.java
  ===================================================================
  /*
   * Copyright (C) The Apache Software Foundation. All rights reserved.
   *
   * This software is published under the terms of the Apache Software License
   * version 1.1, a copy of which has been included with this distribution in
   * the LICENSE file.
   * 
   * $Id: MessageDrivenObject.java,v 1.2 2002/05/17 15:05:46 jstrachan Exp $
   */
  package org.apache.commons.messenger;
  
  import javax.transaction.Transaction;
  
  /** 
   * <p><code>XACapable</code> is an object (typically a MessageListener in this context)
   * which can be part of an XA transaction.
   * This just means that this object has a way of providing XA resources.</p>
   *
   * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
   * @version $Revision: 1.2 $
   */
  public interface XACapable {
  
  	/**
  	 * This method is called to enlist any XA resources the given object 
  	 * has to be part of the XA transaction.
  	 * 
  	 * @param transaction the transaction to enlist to 
  	 */	
  	public void enlistResources(Transaction transaction) throws Exception;
  
  	/**
  	 * This method is called to delist any XA resources the given object 
  	 * has previously enlisted to this XA transaction.
  	 * 
  	 * @param transaction the transaction to delist resources from
  	 * @param flag is the flag used by JTA when delisting resources.
  	 * It is either XAResource.TMSUCCESS, XAResource.TMSUSPEND, or XAResource.TMFAIL
  	 */	
  	public void delistResources(Transaction transaction, int flag) throws Exception;
  }
  
  
  
  
  1.2       +8 -2      jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/ConsumerThread.java
  
  Index: ConsumerThread.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/ConsumerThread.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- ConsumerThread.java	21 Oct 2002 20:31:27 -0000	1.1
  +++ ConsumerThread.java	8 Nov 2002 14:54:26 -0000	1.2
  @@ -64,7 +64,13 @@
           }
           
           while (! isShouldStop()) {
  -            startTransaction();
  +        	try {
  +	            startTransaction();
  +        	}
  +        	catch (Exception e) {
  +        		log.error("Caught exception trying to start transaction. This thread will terminate. Reason: " + e, e);
  +        		break;
  +        	}
   
               try {
                   Message message = receive();
  @@ -232,7 +238,7 @@
        * Strategy method to represent the code required to start
        * a transaction.
        */
  -    protected void startTransaction() {
  +    protected void startTransaction() throws Exception {
       }
   
       /**
  
  
  
  1.1                  jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/XAConsumerThread.java
  
  Index: XAConsumerThread.java
  ===================================================================
  /*
   * Copyright (C) The Apache Software Foundation. All rights reserved.
   *
   * This software is published under the terms of the Apache Software License
   * version 1.1, a copy of which has been included with this distribution in
   * the LICENSE file.
   * 
   * $Id: ManagerServlet.java,v 1.12 2002/05/15 14:36:34 jstrachan Exp $
   */
  package org.apache.commons.messagelet;
  
  import javax.jms.Message;
  import javax.jms.MessageConsumer;
  import javax.jms.MessageListener;
  import javax.jms.JMSException;
  import javax.jms.Session;
  import javax.jms.XASession;
  
  import javax.transaction.SystemException;
  import javax.transaction.Transaction;
  import javax.transaction.TransactionManager;
  import javax.transaction.xa.XAResource;
  
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  
  import org.apache.commons.messenger.Messenger;
  import org.apache.commons.messenger.XACapable;
  import org.apache.commons.messenger.XACapableAdapter;
  
  /** 
   * <p><code>XAConsumerThread</code> is a thread which will perform XA processing
   * of messages
   *
   * @author damon.hamacha
   * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
   * @version $Revision: 1.12 $
   */
  public class XAConsumerThread extends ConsumerThread {
  
  	/** Logger */
  	private static final Log log = LogFactory.getLog(XAConsumerThread.class);
  
  	private TransactionManager transctionManager;
  	private Transaction transaction;
  
  	public XAConsumerThread() {
  		setName("XAConsumer" + getName());
  	}
  
  	/**
  	 * @return the TransactionManager to be used
	 * @throws SystemException
	 */
  	public TransactionManager getTransactionManager() throws SystemException {
  		if (transctionManager == null) {
  			transctionManager = createTransactionManager();
  		}
  		return transctionManager;
  	}
  
  	/**
  	 * Sets the transaction manager to be used
  	 * 
	 * @param transctionManager the transaction manager to be used
	 */
  	public void setTransactionManager(TransactionManager transctionManager) {
  		this.transctionManager = transctionManager;
  	}
  
  	// Implementation methods
      //-------------------------------------------------------------------------    
  	
  	/**
  	 * Factory method to create a TransactionManager via some mechanism.
  	 * By default this mechanism will lookup in JNDI 
	 */
  	protected TransactionManager createTransactionManager() throws SystemException {
  		return null;
  	}
  	
  			
  	/**
  	 * Enlists any resources with the current transaction.
  	 * Typically the input Messenger's Session will always be
  	 * enlisted. Then if the current MessageListener implements XACapable
  	 * then any of its resources will also be enlisted.
  	 * 
  	 * @param transaction the transaction to enlist resources with
  	 * @throws Exception if the enlistment fails for whatever reason
  	 */
  	protected void enlist(Transaction transaction) throws Exception {
  		XACapable xaCapable = getXACapable( getMessenger() );
  		xaCapable.enlistResources(transaction);
  	
  		MessageListener listener = getListener();
  		if (listener instanceof XACapable) {
  			xaCapable = (XACapable) listener;
  			xaCapable.enlistResources(transaction);
  		}
  	}
  		
  
  
  	/**
  	 * Delists any resources from the current transaction.
  	 * This includes the current input Messenger's Session as well
  	 * as any resources used by the MessageListener if it implements
  	 * XACapable
  	 * 
  	 * @param transaction
  	 * @param flag is the flag used by JTA when delisting resources.
  	 * It is either XAResource.TMSUCCESS, XAResource.TMSUSPEND, or XAResource.TMFAIL
  	 * @throws Exception
  	 */
  	protected void delist(Transaction transaction, int flag) throws Exception {
  		XACapable xaCapable = getXACapable( getMessenger() );
  		xaCapable.delistResources(transaction, flag);
  		
  		MessageListener listener = getListener();
  		if (listener instanceof XACapable) {
  			xaCapable = (XACapable) listener;
  			xaCapable.delistResources(transaction, flag);
  		}
  	}
  
  	/**
  	 * Strategy method to represent the code required to start
  	 * a transaction.
  	 */
  	protected void startTransaction() throws Exception {
  		getTransactionManager().begin();
  		transaction = getTransactionManager().getTransaction();
  
  		enlist(transaction);
  	}
  
  	/**
  	 * Strategy method to represent the code required to commit
  	 * a transaction.
  	 */
  	protected void commitTransaction() throws Exception {
  		transaction.commit();
  		delist(transaction, XAResource.TMSUCCESS);
  	}
  
  	/**
  	 * Strategy method to represent the code required to rollback
  	 * a transaction.
  	 */
  	protected void rollbackTransaction() throws Exception {
  		transaction.rollback();
  		delist(transaction, XAResource.TMFAIL);
  	}
  
  	/**
  	* Strategy method to represent the code required to rollback
  	* a transaction.
  	*/
  	protected void rollbackOnlyTransaction(Exception e) throws Exception {
  		transaction.setRollbackOnly();
  		log.error(e);
  		delist(transaction, XAResource.TMFAIL);
  	}
  
  	/**
  	 * Strategy method to represent the code required to cancel
  	 * a transaction. 
  	 * This is called when a message is not received.
  	 */
  	protected void cancelTransaction() throws Exception {
  		transaction.rollback();
  		delist(transaction, XAResource.TMFAIL);
  	}
  
  	/**
  	 * @return an XACapable for the given Messenger
	 */
  	protected XACapable getXACapable(Messenger messenger) {		
  		if (messenger instanceof XACapable) {
  			return (XACapable) messenger;
  		}
  		return new XACapableAdapter(messenger);
  	}
  }
  
  
  
  1.11      +6 -15     jakarta-commons-sandbox/messenger/project.xml
  
  Index: project.xml
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/project.xml,v
  retrieving revision 1.10
  retrieving revision 1.11
  diff -u -r1.10 -r1.11
  --- project.xml	2 Oct 2002 15:00:04 -0000	1.10
  +++ project.xml	8 Nov 2002 14:54:26 -0000	1.11
  @@ -4,7 +4,7 @@
     <pomVersion>3</pomVersion>
     <name>commons-messenger</name>
     <id>commons-messenger</id>
  -  <currentVersion>1.0-dev-8</currentVersion>
  +  <currentVersion>1.0-dev-9</currentVersion>
     <organization>
   	  <name>Apache Software Foundation</name>
   	  <url>http://www.apache.org</url>
  @@ -72,56 +72,47 @@
       
       <dependency>
         <id>commons-logging</id>
  -      <type>required</type>
         <version>1.0</version>
  -      <jar>commons-logging-1.0.jar</jar>
       </dependency>
   
       <dependency>
         <id>commons-beanutils</id>
  -      <type>required</type>
         <version>1.3</version>
  -      <jar>commons-beanutils-1.3.jar</jar>
       </dependency>
   
       <dependency>
         <id>commons-collections</id>
  -      <type>required</type>
         <version>2.0</version>
  -      <jar>commons-collections-2.0.jar</jar>
       </dependency>
   
       <dependency>
         <id>commons-digester</id>
  -      <type>required</type>
         <version>1.2</version>
  -      <jar>commons-digester-1.2.jar</jar>
       </dependency>
   
       <dependency>
         <id>servletapi</id>
  -      <type>required</type>
         <version>2.3</version>
  -      <jar>servletapi-2.3.jar</jar>
       </dependency>
   
       <dependency>
         <id>jms</id>
  -      <type>required</type>
         <version>1.0.2b</version>
  -      <jar>jms-1.0.2b.jar</jar>
  +    </dependency>
  +
  +    <dependency>
  +      <id>jta</id>
  +      <version>1.0.1</version>
       </dependency>
   
       <dependency>
         <id>xml-apis</id>
  -      <type>required</type>
         <version>2.0.0</version>
       </dependency>
   
       <dependency>
         <id>ant</id>
         <version>1.4.1</version>
  -      <jar>ant-1.4.1.jar</jar>
       </dependency>
   
   <!-- runtime dependencies only required for testing and sample programs -->
  
  
  

--
To unsubscribe, e-mail:   <ma...@jakarta.apache.org>
For additional commands, e-mail: <ma...@jakarta.apache.org>