You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@commons.apache.org by js...@apache.org on 2002/11/08 15:54:26 UTC
cvs commit: jakarta-commons-sandbox/messenger project.xml
jstrachan 2002/11/08 06:54:26
Modified: messenger/src/java/org/apache/commons/messenger
MessengerDigester.java DefaultMessenger.java
Messenger.java
messenger/src/java/org/apache/commons/messagelet
ConsumerThread.java
messenger project.xml
Added: messenger/src/java/org/apache/commons/messenger
XAMessenger.java XACapableAdapter.java
XACapable.java
messenger/src/java/org/apache/commons/messagelet
XAConsumerThread.java
Log:
Added support for XA message consumption.
Revision Changes Path
1.5 +42 -23 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerDigester.java
Index: MessengerDigester.java
===================================================================
RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerDigester.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- MessengerDigester.java 13 Nov 2001 12:46:10 -0000 1.4
+++ MessengerDigester.java 8 Nov 2002 14:54:25 -0000 1.5
@@ -22,6 +22,7 @@
// default implementation classes
private String messengerManagerClass = "org.apache.commons.messenger.MessengerManager";
private String messengerClass = "org.apache.commons.messenger.DefaultMessenger";
+ private String xaMessengerClass = "org.apache.commons.messenger.XAMessenger";
private String sessionFactory = "org.apache.commons.messenger.SessionFactory";
private String connectionFactory = "org.apache.commons.messenger.DummyConnectionFactory";
private String jndiSessionFactory = "org.apache.commons.messenger.JNDISessionFactory";
@@ -46,39 +47,57 @@
addObjectCreate( "manager", messengerManagerClass, "className" );
addSetProperties( "manager" );
-
- addObjectCreate( "manager/messenger", messengerClass, "className" );
- addSetProperties( "manager/messenger" );
+
+ String path = "manager/messenger";
+ addObjectCreate( path, messengerClass, "className" );
+ addSetProperties( path );
- addSetNext( "manager/messenger", "addMessenger",
+ addSetNext( path, "addMessenger",
"org.apache.commons.messenger.Messenger"
);
+
+ addMessengerPaths(path);
+
+ path = "manager/xaMessenger";
+ addObjectCreate( path, xaMessengerClass, "className" );
+ addSetProperties( path );
- addObjectCreate( "manager/messenger/factory", sessionFactory, "className" );
- addSetProperties( "manager/messenger/factory" );
- addSetNext( "manager/messenger/factory", "setSessionFactory",
+ addSetNext( path, "addMessenger",
+ "org.apache.commons.messenger.Messenger"
+ );
+
+ addMessengerPaths(path);
+ }
+
+ protected void addMessengerPaths(String root) {
+
+ String path = root + "/factory";
+ addObjectCreate( path, sessionFactory, "className" );
+ addSetProperties( path );
+ addSetNext( path, "setSessionFactory",
"org.apache.commons.messenger.SessionFactory"
);
- addCallMethod( "manager/messenger/factory/property", "addProperty", 2);
- addCallParam( "manager/messenger/factory/property/name", 0 );
- addCallParam( "manager/messenger/factory/property/value", 1 );
-
- addObjectCreate( "manager/messenger/factory/connectionFactory", connectionFactory, "className" );
- addSetProperties( "manager/messenger/factory/connectionFactory" );
- addSetNext( "manager/messenger/factory/connectionFactory", "setConnectionFactory",
+ addCallMethod( path + "/property", "addProperty", 2);
+ addCallParam( path + "/property/name", 0 );
+ addCallParam( path + "/property/value", 1 );
+
+ path = root + "/factory/connectionFactory";
+ addObjectCreate( path, connectionFactory, "className" );
+ addSetProperties( path );
+ addSetNext( path, "setConnectionFactory",
"javax.jms.ConnectionFactory"
);
+ path = root + "/jndi";
+ addObjectCreate( path, jndiSessionFactory, "className" );
+ addSetProperties( path );
- addObjectCreate( "manager/messenger/jndi", jndiSessionFactory, "className" );
- addSetProperties( "manager/messenger/jndi" );
-
- addSetNext( "manager/messenger/jndi", "setSessionFactory",
+ addSetNext( path, "setSessionFactory",
"org.apache.commons.messenger.SessionFactory"
);
- addCallMethod( "manager/messenger/jndi/property", "addProperty", 2);
- addCallParam( "manager/messenger/jndi/property/name", 0 );
- addCallParam( "manager/messenger/jndi/property/value", 1 );
+ addCallMethod( path + "/property", "addProperty", 2);
+ addCallParam( path + "/property/name", 0 );
+ addCallParam( path + "/property/value", 1 );
}
}
1.13 +6 -2 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/DefaultMessenger.java
Index: DefaultMessenger.java
===================================================================
RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/DefaultMessenger.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- DefaultMessenger.java 12 Jul 2002 07:41:16 -0000 1.12
+++ DefaultMessenger.java 8 Nov 2002 14:54:25 -0000 1.13
@@ -87,6 +87,10 @@
messengerSessionPool = new ThreadLocal();
}
}
+
+ public Session getSession() throws JMSException {
+ return getMessengerSession().getSession();
+ }
// Implementation methods
//-------------------------------------------------------------------------
1.14 +9 -2 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/Messenger.java
Index: Messenger.java
===================================================================
RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/Messenger.java,v
retrieving revision 1.13
retrieving revision 1.14
diff -u -r1.13 -r1.14
--- Messenger.java 6 Sep 2002 11:53:00 -0000 1.13
+++ Messenger.java 8 Nov 2002 14:54:25 -0000 1.14
@@ -21,6 +21,7 @@
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.QueueBrowser;
+import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.ServerSessionPool;
import javax.jms.TextMessage;
@@ -108,6 +109,12 @@
/** Returns the underlying JMS connection that this Messenger is using */
public Connection getConnection() throws JMSException;
+
+ /**
+ * Returns the underlying JMS session that this thread is using for
+ * this Messenger for synchronous operation
+ */
public Session getSession() throws JMSException;
+
/** Creates a ConnectionConsumer which is useful if used inside an application server
* to associate multiple threads with consuming from a JMS destination */
1.1 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/XAMessenger.java
Index: XAMessenger.java
===================================================================
/*
* Copyright (C) The Apache Software Foundation. All rights reserved.
*
* This software is published under the terms of the Apache Software License
* version 1.1, a copy of which has been included with this distribution in
* the LICENSE file.
*
* $Id: DefaultMessenger.java,v 1.12 2002/07/12 07:41:16 jstrachan Exp $
*/
package org.apache.commons.messenger;
import javax.jms.Session;
import javax.jms.XASession;
import javax.transaction.Transaction;
import javax.transaction.xa.XAResource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/** <p><code>XAMessenger</code> is a default implementation of
* Messenger which can also support XA transactions by enlisting and delisting
* XAResources.
* This is implemented as a seperate Messenger implementation to avoid the core
* Messenger having a dependency on JTA.
* .</p>
*
* @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
* @version $Revision: 1.12 $
*/
public class XAMessenger extends DefaultMessenger implements XACapable {
/** Logger */
private static final Log log = LogFactory.getLog(XAMessenger.class);
public XAMessenger() {
}
// XACapable interface
//-------------------------------------------------------------------------
public void enlistResources(Transaction transaction) throws Exception {
XAResource resource = getXAResource();
if (resource != null) {
transaction.enlistResource(resource);
}
}
public void delistResources(Transaction transaction, int flag) throws Exception {
XAResource resource = getXAResource();
if (resource != null) {
transaction.delistResource(resource, flag);
}
}
// Implementation methods
//-------------------------------------------------------------------------
/**
* @return the XAResource associated with this Messenger if one exists
*/
protected XAResource getXAResource() throws Exception {
Session session = getMessengerSession().getSession();
if (session instanceof XASession) {
XASession xaSession = (XASession) session;
return xaSession.getXAResource();
}
return null;
}
}
1.1 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/XACapableAdapter.java
Index: XACapableAdapter.java
===================================================================
/*
* Copyright (C) The Apache Software Foundation. All rights reserved.
*
* This software is published under the terms of the Apache Software License
* version 1.1, a copy of which has been included with this distribution in
* the LICENSE file.
*
* $Id: MessageDrivenObject.java,v 1.2 2002/05/17 15:05:46 jstrachan Exp $
*/
package org.apache.commons.messenger;
import javax.jms.Session;
import javax.jms.XASession;
import javax.transaction.Transaction;
import javax.transaction.xa.XAResource;
/**
* <p><code>XACapableAdapter</code> is an adapter that implements
* XACapable for a given Messenger
* </p>
*
* @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
* @version $Revision: 1.2 $
*/
public class XACapableAdapter implements XACapable {
private Messenger messenger;
public XACapableAdapter(Messenger messenger) {
this.messenger = messenger;
}
// XACapable interface
//-------------------------------------------------------------------------
public void enlistResources(Transaction transaction) throws Exception {
XAResource resource = getXAResource();
if (resource != null) {
transaction.enlistResource(resource);
}
}
public void delistResources(Transaction transaction, int flag) throws Exception {
XAResource resource = getXAResource();
if (resource != null) {
transaction.delistResource(resource, flag);
}
}
// Implementation methods
//-------------------------------------------------------------------------
/**
* @return the XAResource associated with this Messenger if one exists
*/
protected XAResource getXAResource() throws Exception {
Session session = messenger.getSession();
if (session instanceof XASession) {
XASession xaSession = (XASession) session;
return xaSession.getXAResource();
}
return null;
}
}
1.1 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/XACapable.java
Index: XACapable.java
===================================================================
/*
* Copyright (C) The Apache Software Foundation. All rights reserved.
*
* This software is published under the terms of the Apache Software License
* version 1.1, a copy of which has been included with this distribution in
* the LICENSE file.
*
* $Id: MessageDrivenObject.java,v 1.2 2002/05/17 15:05:46 jstrachan Exp $
*/
package org.apache.commons.messenger;
import javax.transaction.Transaction;
/**
* <p><code>XACapable</code> is an object (typically a MessageListener in this context)
* which can be part of an XA transaction.
* This just means that this object has a way of providing XA resources.</p>
*
* @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
* @version $Revision: 1.2 $
*/
public interface XACapable {
/**
* This method is called to enlist any XA resources the given object
* has to be part of the XA transaction.
*
* @param transaction the transaction to enlist to
*/
public void enlistResources(Transaction transaction) throws Exception;
/**
* This method is called to delist any XA resources the given object
* has previously enlisted to this XA transaction.
*
* @param transaction the transaction to delist resources from
* @param flag is the flag used by JTA when delisting resources.
* It is either XAResource.TMSUCCESS, XAResource.TMSUSPEND, or XAResource.TMFAIL
*/
public void delistResources(Transaction transaction, int flag) throws Exception;
}
1.2 +8 -2 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/ConsumerThread.java
Index: ConsumerThread.java
===================================================================
RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/ConsumerThread.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- ConsumerThread.java 21 Oct 2002 20:31:27 -0000 1.1
+++ ConsumerThread.java 8 Nov 2002 14:54:26 -0000 1.2
@@ -64,7 +64,13 @@
}
while (! isShouldStop()) {
- startTransaction();
+ try {
+ startTransaction();
+ }
+ catch (Exception e) {
+ log.error("Caught exception trying to start transaction. This thread will terminate. Reason: " + e, e);
+ break;
+ }
try {
Message message = receive();
@@ -232,7 +238,7 @@
* Strategy method to represent the code required to start
* a transaction.
*/
- protected void startTransaction() {
+ protected void startTransaction() throws Exception {
}
/**
1.1 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/XAConsumerThread.java
Index: XAConsumerThread.java
===================================================================
/*
* Copyright (C) The Apache Software Foundation. All rights reserved.
*
* This software is published under the terms of the Apache Software License
* version 1.1, a copy of which has been included with this distribution in
* the LICENSE file.
*
* $Id: ManagerServlet.java,v 1.12 2002/05/15 14:36:34 jstrachan Exp $
*/
package org.apache.commons.messagelet;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.XASession;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.messenger.Messenger;
import org.apache.commons.messenger.XACapable;
import org.apache.commons.messenger.XACapableAdapter;
/**
* <p><code>XAConsumerThread</code> is a thread which will perform XA processing
* of messages
*
* @author damon.hamacha
* @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
* @version $Revision: 1.12 $
*/
public class XAConsumerThread extends ConsumerThread {
/** Logger */
private static final Log log = LogFactory.getLog(XAConsumerThread.class);
private TransactionManager transctionManager;
private Transaction transaction;
public XAConsumerThread() {
setName("XAConsumer" + getName());
}
/**
* @return the TransactionManager to be used
* @throws SystemException
*/
public TransactionManager getTransactionManager() throws SystemException {
if (transctionManager == null) {
transctionManager = createTransactionManager();
}
return transctionManager;
}
/**
* Sets the transaction manager to be used
*
* @param transctionManager the transaction manager to be used
*/
public void setTransactionManager(TransactionManager transctionManager) {
this.transctionManager = transctionManager;
}
// Implementation methods
//-------------------------------------------------------------------------
/**
* Factory method to create a TransactionManager via some mechanism.
* By default this mechanism will lookup in JNDI
*/
protected TransactionManager createTransactionManager() throws SystemException {
return null;
}
/**
* Enlists any resources with the current transaction.
* Typically the input Messenger's Session will always be
* enlisted. Then if the current MessageListener implements XACapable
* then any of its resources will also be enlisted.
*
* @param transaction the transaction to enlist resources with
* @throws Exception if the enlistment fails for whatever reason
*/
protected void enlist(Transaction transaction) throws Exception {
XACapable xaCapable = getXACapable( getMessenger() );
xaCapable.enlistResources(transaction);
MessageListener listener = getListener();
if (listener instanceof XACapable) {
xaCapable = (XACapable) listener;
xaCapable.enlistResources(transaction);
}
}
/**
* Delists any resources from the current transaction.
* This includes the current input Messenger's Session as well
* as any resources used by the MessageListener if it implements
* XACapable
*
* @param transaction
* @param flag is the flag used by JTA when delisting resources.
* It is either XAResource.TMSUCCESS, XAResource.TMSUSPEND, or XAResource.TMFAIL
* @throws Exception
*/
protected void delist(Transaction transaction, int flag) throws Exception {
XACapable xaCapable = getXACapable( getMessenger() );
xaCapable.delistResources(transaction, flag);
MessageListener listener = getListener();
if (listener instanceof XACapable) {
xaCapable = (XACapable) listener;
xaCapable.delistResources(transaction, flag);
}
}
/**
* Strategy method to represent the code required to start
* a transaction.
*/
protected void startTransaction() throws Exception {
getTransactionManager().begin();
transaction = getTransactionManager().getTransaction();
enlist(transaction);
}
/**
* Strategy method to represent the code required to commit
* a transaction.
*/
protected void commitTransaction() throws Exception {
transaction.commit();
delist(transaction, XAResource.TMSUCCESS);
}
/**
* Strategy method to represent the code required to rollback
* a transaction.
*/
protected void rollbackTransaction() throws Exception {
transaction.rollback();
delist(transaction, XAResource.TMFAIL);
}
/**
* Strategy method to represent the code required to rollback
* a transaction.
*/
protected void rollbackOnlyTransaction(Exception e) throws Exception {
transaction.setRollbackOnly();
log.error(e);
delist(transaction, XAResource.TMFAIL);
}
/**
* Strategy method to represent the code required to cancel
* a transaction.
* This is called when a message is not received.
*/
protected void cancelTransaction() throws Exception {
transaction.rollback();
delist(transaction, XAResource.TMFAIL);
}
/**
* @return an XACapable for the given Messenger
*/
protected XACapable getXACapable(Messenger messenger) {
if (messenger instanceof XACapable) {
return (XACapable) messenger;
}
return new XACapableAdapter(messenger);
}
}
1.11 +6 -15 jakarta-commons-sandbox/messenger/project.xml
Index: project.xml
===================================================================
RCS file: /home/cvs/jakarta-commons-sandbox/messenger/project.xml,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -r1.10 -r1.11
--- project.xml 2 Oct 2002 15:00:04 -0000 1.10
+++ project.xml 8 Nov 2002 14:54:26 -0000 1.11
@@ -4,7 +4,7 @@
<pomVersion>3</pomVersion>
<name>commons-messenger</name>
<id>commons-messenger</id>
- <currentVersion>1.0-dev-8</currentVersion>
+ <currentVersion>1.0-dev-9</currentVersion>
<organization>
<name>Apache Software Foundation</name>
<url>http://www.apache.org</url>
@@ -72,56 +72,47 @@
<dependency>
<id>commons-logging</id>
- <type>required</type>
<version>1.0</version>
- <jar>commons-logging-1.0.jar</jar>
</dependency>
<dependency>
<id>commons-beanutils</id>
- <type>required</type>
<version>1.3</version>
- <jar>commons-beanutils-1.3.jar</jar>
</dependency>
<dependency>
<id>commons-collections</id>
- <type>required</type>
<version>2.0</version>
- <jar>commons-collections-2.0.jar</jar>
</dependency>
<dependency>
<id>commons-digester</id>
- <type>required</type>
<version>1.2</version>
- <jar>commons-digester-1.2.jar</jar>
</dependency>
<dependency>
<id>servletapi</id>
- <type>required</type>
<version>2.3</version>
- <jar>servletapi-2.3.jar</jar>
</dependency>
<dependency>
<id>jms</id>
- <type>required</type>
<version>1.0.2b</version>
- <jar>jms-1.0.2b.jar</jar>
+ </dependency>
+
+ <dependency>
+ <id>jta</id>
+ <version>1.0.1</version>
</dependency>
<dependency>
<id>xml-apis</id>
- <type>required</type>
<version>2.0.0</version>
</dependency>
<dependency>
<id>ant</id>
<version>1.4.1</version>
- <jar>ant-1.4.1.jar</jar>
</dependency>
<!-- runtime dependencies only required for testing and sample programs -->
--
To unsubscribe, e-mail: <ma...@jakarta.apache.org>
For additional commands, e-mail: <ma...@jakarta.apache.org>