You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Eugene Prokopiev <pr...@stc.donpac.ru> on 2006/08/18 12:13:47 UTC

Using PooledSpringXAConnectionFactory in different threads

Hi,

Is it possible to create sessions from PooledSpringXAConnectionFactory 
in different threads?

I have this simple context:

<beans>

	<bean id="broker" class="org.apache.activemq.broker.BrokerService" 
init-method="start" destroy-method="stop">
		<property name="persistent" value="false"/>
		<property name="transportConnectorURIs">
			<list>
				<value>tcp://localhost:5000</value>
			</list>
		</property>
	</bean>
	
	<bean id="jotm" 
class="org.springframework.transaction.jta.JotmFactoryBean"/>
	<bean id="jotmTransactionManager" 
class="org.springframework.transaction.jta.JtaTransactionManager">
		<property name="userTransaction" ref="jotm"/>
	</bean>
	
	<bean id="connectionFactory" 
class="org.jencks.pool.PooledSpringXAConnectionFactory">
		<property name="connectionFactory">
			<bean class="org.apache.activemq.ActiveMQXAConnectionFactory">
				<property name="brokerURL" value="tcp://localhost:5000" />
			</bean>
		</property>
		<property name="transactionManager" ref="jotm"/>
	</bean>
	
	<bean id="messageReceiver" 
class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean">
         <property name="transactionManager" ref="jotmTransactionManager"/>
         <property name="transactionAttributes">
             <props>
                 <prop key="*">PROPAGATION_REQUIRED</prop>
             </props>
         </property>
         <property name="target">
         	<bean class="simple.MessageReceiver">
				<property name="connectionFactory" ref="connectionFactory"/>
			</bean>
         </property>
		<property name="proxyTargetClass" value="true"/>
     </bean>
	
</beans>

MessageReceiver.java is:

public class MessageReceiver {

	private Log log = LogFactory.getLog(getClass());
	
	private ConnectionFactory connectionFactory;

	public void setConnectionFactory(ConnectionFactory connectionFactory) {
		this.connectionFactory = connectionFactory;
	}

	public void receive() {
		Thread readerThread = new Thread(new Runnable(){
			public void run() {
				try {
					Connection connection = connectionFactory.createConnection();
					connection.start();
					Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
					Destination destination = session.createQueue("messages.input");
			        MessageConsumer consumer = session.createConsumer(destination);
			        log.debug("receive message ...");
					while (true) {
			            Message message = consumer.receive(Long.MAX_VALUE);
			            log.debug("received message : "+message);
			    	}
				} catch (Exception e) {
					e.printStackTrace();
				}				
			}			
		});
		readerThread.start();
	}
	
}

On running context, starting MessageReceiver.receive() and producing 
message I got:


INFO  BrokerService - ActiveMQ null JMS Message Broker (localhost) is 
starting
INFO  BrokerService - For help or more information please see: 
http://incubator.apache.org/activemq/
INFO  TransportServerThreadSupport - Listening for connections at: 
tcp://prokopiev.stc.donpac.ru:5000
INFO  TransportConnector - Connector tcp://prokopiev.stc.donpac.ru:5000 
Started
INFO  BrokerService - ActiveMQ JMS Message Broker (localhost, 
ID:prokopiev.stc.donpac.ru-38166-1155892764301-0:0) started
INFO  jotm - JOTM started with a local transaction factory which is not 
bound.
INFO  jotm - CAROL initialization
INFO  ConfigurationRepository - No protocols were defined for property 
'carol.protocols', trying with default protocol = 'jrmp'.
INFO  jta - JOTM 2.0.10
INFO  JtaTransactionManager - Using JTA UserTransaction: 
org.objectweb.jotm.Current@19836ed
INFO  JtaTransactionManager - Using JTA TransactionManager: 
org.objectweb.jotm.Current@19836ed
INFO  ManagementContext - JMX consoles can connect to 
service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
INFO  DefaultAopProxyFactory - CGLIB2 available: proxyTargetClass 
feature enabled
DEBUG MessageReceiver - receive message ...
DEBUG PooledSpringXAConnection - -->> ENTERING 
PooledSpringXAConnection.createXASession()
DEBUG PooledSpringXAConnection - -->> THERE IS NO ACTIVE TRANSACTION, SO 
JUST RETURNING BORROWED SESSION...
DEBUG XASessionPool - ---->>>>> BORROWING JMS SESSION FROM POOL...
DEBUG XASessionPool - ---->>>>> CREATING NEW SESSION TO SATISFY REQUEST!!
DEBUG XASessionPool - ---->>>>> BORROWED SESSION: 
org.jencks.pool.PooledSpringXASession@4c47db
javax.jms.JMSException: Session's XAResource has not been enlisted in a 
distributed transaction.
	at 
org.apache.activemq.ActiveMQXASession.doStartTransaction(ActiveMQXASession.java:109)
	at 
org.apache.activemq.ActiveMQMessageConsumer.ackLater(ActiveMQMessageConsumer.java:658)
	at 
org.apache.activemq.ActiveMQMessageConsumer.beforeMessageIsConsumed(ActiveMQMessageConsumer.java:610)
	at 
org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:469)
	at simple.MessageReceiver$1.run(MessageReceiver.java:35)
	at java.lang.Thread.run(Thread.java:595)


After extracting session creation from different thread I got:

INFO  BrokerService - ActiveMQ null JMS Message Broker (localhost) is 
starting
INFO  BrokerService - For help or more information please see: 
http://incubator.apache.org/activemq/
INFO  TransportServerThreadSupport - Listening for connections at: 
tcp://prokopiev.stc.donpac.ru:5000
INFO  TransportConnector - Connector tcp://prokopiev.stc.donpac.ru:5000 
Started
INFO  BrokerService - ActiveMQ JMS Message Broker (localhost, 
ID:prokopiev.stc.donpac.ru-38153-1155892708903-0:0) started
INFO  jotm - JOTM started with a local transaction factory which is not 
bound.
INFO  jotm - CAROL initialization
INFO  ConfigurationRepository - No protocols were defined for property 
'carol.protocols', trying with default protocol = 'jrmp'.
INFO  jta - JOTM 2.0.10
INFO  JtaTransactionManager - Using JTA UserTransaction: 
org.objectweb.jotm.Current@949f69
INFO  JtaTransactionManager - Using JTA TransactionManager: 
org.objectweb.jotm.Current@949f69
INFO  ManagementContext - JMX consoles can connect to 
service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
INFO  DefaultAopProxyFactory - CGLIB2 available: proxyTargetClass 
feature enabled
DEBUG MessageReceiver - receive message ...
DEBUG PooledSpringXAConnection - -->> ENTERING 
PooledSpringXAConnection.createXASession()
DEBUG PooledSpringXAConnection - -->> ACTUAL TRANSACTION IS ACTIVE!
DEBUG PooledSpringXAConnection - -->> NO ACTIVE SESSION ASSOCIATED WITH 
CURRENT THREAD, BORROWING...
DEBUG XASessionPool - ---->>>>> BORROWING JMS SESSION FROM POOL...
DEBUG XASessionPool - ---->>>>> CREATING NEW SESSION TO SATISFY REQUEST!!
DEBUG XASessionPool - ---->>>>> BORROWED SESSION: 
org.jencks.pool.PooledSpringXASession@1402d5a
DEBUG PooledSpringXAConnection - -->> ENLISTING NEW SESSION'S XAResource 
WITH TRANSACTION...
DEBUG PooledSpringXAConnection - -->> BINDING NEW SESSION WITH 
TRANSACTION...
DEBUG PooledSpringXAConnection - -->> REGISTERING SYNCHRONIZATION WITH 
TRANSACTION...
DEBUG MessageReceiver - received message : ActiveMQObjectMessage 
{commandId = 5, responseRequired = true, messageId = 
ID:prokopiev.stc.donpac.ru-38161-1155892719980-0:0:1:1:1, 
originalDestination = null, originalTransactionId = null, producerId = 
ID:prokopiev.stc.donpac.ru-38161-1155892719980-0:0:1:1, destination = 
queue://messages.input, transactionId = null, expiration = 0, timestamp 
= 1155892720450, arrival = 0, correlationId = null, replyTo = null, 
persistent = true, type = null, priority = 4, groupID = null, 
groupSequence = 0, targetConsumerId = null, compressed = false, userID = 
null, content = org.apache.activeio.packet.ByteSequence@b40ec4, 
marshalledProperties = null, dataStructure = null, redeliveryCounter = 
0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody 
= true}

Why there is no active transaction in first case? How can I activate it?

--
Thanks,
Eugene Prokopiev


Re: Using PooledSpringXAConnectionFactory in different threads

Posted by Eugene Prokopiev <pr...@stc.donpac.ru>.
> All works as I need with Spring programmatic transaction demarcation in 
> separate thread.

This configuration don't work with spring message driven POJO. I 
describe this problem in 
http://opensource.atlassian.com/projects/spring/browse/SPR-2461. Can 
anybody comment this problem more detailed to pawn spring developers 
resolve this problem before 2.0 release?


Re: Using PooledSpringXAConnectionFactory in different threads

Posted by Eugene Prokopiev <pr...@stc.donpac.ru>.
All works as I need with Spring programmatic transaction demarcation in 
separate thread.

context.xml:

<beans>

	<bean id="broker" class="org.apache.activemq.broker.BrokerService" 
init-method="start" destroy-method="stop">
		<property name="persistent" value="false"/>
		<property name="transportConnectorURIs">
			<list>
				<value>tcp://localhost:5000</value>
			</list>
		</property>
	</bean>
	
	<bean id="jotm" 
class="org.springframework.transaction.jta.JotmFactoryBean"/>
	<bean id="jotmTransactionManager" 
class="org.springframework.transaction.jta.JtaTransactionManager">
		<property name="userTransaction" ref="jotm"/>
	</bean>
	<bean id="jotmTransactionTemplate" 
class="org.springframework.transaction.support.TransactionTemplate">
		<property name="transactionManager" ref="jotmTransactionManager"/>
	</bean>
	
	<bean id="jmsConnectionFactory" 
class="org.jencks.pool.PooledSpringXAConnectionFactory">
		<property name="connectionFactory">
			<bean class="org.apache.activemq.ActiveMQXAConnectionFactory">
				<property name="brokerURL" value="tcp://localhost:5000" />
			</bean>
		</property>
		<property name="transactionManager" ref="jotm"/>
	</bean>
	
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="jmsConnectionFactory"/>
		<property name="defaultDestinationName" value="messages.input"/>
	</bean>
	
	<bean id="messageReceiver" class="simple.MessageReceiver" 
init-method="receive">
		<property name="transactionTemplate" ref="jotmTransactionTemplate"/>
		<property name="jmsTemplate" ref="jmsTemplate"/>
	</bean>
	
</beans>

MessageReceiver.java:

public class MessageReceiver {

	private Log log = LogFactory.getLog(getClass());
	
	private JmsTemplate jmsTemplate;
	private TransactionTemplate transactionTemplate;

	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}
	
	public void setTransactionTemplate(TransactionTemplate 
transactionTemplate) {
		this.transactionTemplate = transactionTemplate;
	}

	public void receive() {
		Thread readerThread = new Thread(new Runnable(){
			public void run() {
				while(!Thread.currentThread().isInterrupted()) {
					transactionTemplate.execute(
			            new TransactionCallbackWithoutResult() {
			                public void 
doInTransactionWithoutResult(TransactionStatus status) {
			                	Message message = jmsTemplate.receive();
			    				log.debug(message);
			                }
			            }
			        );	
				}
			}			
		});
		readerThread.start();
	}
	
}

On starting and sending message to queue://messages.input I see:

INFO  BrokerService - ActiveMQ null JMS Message Broker (localhost) is 
starting
INFO  BrokerService - For help or more information please see: 
http://incubator.apache.org/activemq/
INFO  TransportServerThreadSupport - Listening for connections at: 
tcp://prokopiev.stc.donpac.ru:5000
INFO  TransportConnector - Connector tcp://prokopiev.stc.donpac.ru:5000 
Started
INFO  BrokerService - ActiveMQ JMS Message Broker (localhost, 
ID:prokopiev.stc.donpac.ru-42291-1156317141577-0:0) started
INFO  jotm - JOTM started with a local transaction factory which is not 
bound.
INFO  jotm - CAROL initialization
INFO  ConfigurationRepository - No protocols were defined for property 
'carol.protocols', trying with default protocol = 'jrmp'.
INFO  jta - JOTM 2.0.10
INFO  JtaTransactionManager - Using JTA UserTransaction: 
org.objectweb.jotm.Current@d9660d
INFO  JtaTransactionManager - Using JTA TransactionManager: 
org.objectweb.jotm.Current@d9660d
INFO  ManagementContext - JMX consoles can connect to 
service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
DEBUG PooledSpringXAConnection - -->> ENTERING 
PooledSpringXAConnection.createXASession()
DEBUG PooledSpringXAConnection - -->> ACTUAL TRANSACTION IS ACTIVE!
DEBUG PooledSpringXAConnection - -->> NO ACTIVE SESSION ASSOCIATED WITH 
CURRENT THREAD, BORROWING...
DEBUG XASessionPool - ---->>>>> BORROWING JMS SESSION FROM POOL...
DEBUG XASessionPool - ---->>>>> CREATING NEW SESSION TO SATISFY REQUEST!!
DEBUG XASessionPool - ---->>>>> BORROWED SESSION: 
org.jencks.pool.PooledSpringXASession@b60b93
DEBUG PooledSpringXAConnection - -->> ENLISTING NEW SESSION'S XAResource 
WITH TRANSACTION...
DEBUG PooledSpringXAConnection - -->> BINDING NEW SESSION WITH 
TRANSACTION...
DEBUG PooledSpringXAConnection - -->> REGISTERING SYNCHRONIZATION WITH 
TRANSACTION...

DEBUG PooledSpringXASession - ---->>>>> PooledSpringXASession.close() called
DEBUG PooledSpringXASession - ---->>>>> ignoreClose IS TRUE!  KEEPING 
SESSION OPEN!
DEBUG MessageReceiver - ActiveMQObjectMessage {commandId = 5, 
responseRequired = true, messageId = 
ID:prokopiev.stc.donpac.ru-42299-1156317149577-0:0:1:1:1, 
originalDestination = null, originalTransactionId = null, producerId = 
ID:prokopiev.stc.donpac.ru-42299-1156317149577-0:0:1:1, destination = 
queue://messages.input, transactionId = null, expiration = 0, timestamp 
= 1156317150047, arrival = 0, correlationId = null, replyTo = null, 
persistent = true, type = null, priority = 4, groupID = null, 
groupSequence = 0, targetConsumerId = null, compressed = false, userID = 
null, content = org.apache.activeio.packet.ByteSequence@14835fb, 
marshalledProperties = null, dataStructure = null, redeliveryCounter = 
0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody 
= true}
DEBUG PooledSpringXAConnection - -->> 
PooledSpringXAConnection.[synchronization].afterCompletion() CALLED...
DEBUG PooledSpringXAConnection - -->> RETURNING JMS SESSION TO POOL...
DEBUG PooledSpringXASession - ---->>>>> PooledSpringXASession.close() called
DEBUG PooledSpringXASession - ---->>>>> ignoreClose = false, so 
returning session pool...
DEBUG XASessionPool - ---->>>>> SESSION HAS BEEN RETURNED TO POOL: 
org.jencks.pool.PooledSpringXASession@b60b93
DEBUG PooledSpringXAConnection - -->> ENTERING 
PooledSpringXAConnection.createXASession()
DEBUG PooledSpringXAConnection - -->> ACTUAL TRANSACTION IS ACTIVE!
DEBUG PooledSpringXAConnection - -->> NO ACTIVE SESSION ASSOCIATED WITH 
CURRENT THREAD, BORROWING...
DEBUG XASessionPool - ---->>>>> BORROWING JMS SESSION FROM POOL...
DEBUG XASessionPool - ---->>>>> BORROWED SESSION: 
org.jencks.pool.PooledSpringXASession@b60b93
DEBUG PooledSpringXAConnection - -->> ENLISTING NEW SESSION'S XAResource 
WITH TRANSACTION...
DEBUG PooledSpringXAConnection - -->> BINDING NEW SESSION WITH 
TRANSACTION...
DEBUG PooledSpringXAConnection - -->> REGISTERING SYNCHRONIZATION WITH 
TRANSACTION...

INFO  jotm - set rollback only (tx=bb14:38:0:0148841f24dbed9394...eebc02:)

I confused by last message but transaction is commited as I see to 
queue://messages.input - this queue is empty after reading message from it

-- 
Thanks,
Eugene Prokopiev