You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Eugene Prokopiev <pr...@stc.donpac.ru> on 2006/08/01 15:45:19 UTC

Jencks : transactional consuming and producing message problem

Hi,

I use this context.xml to run AMQ+Jencks+Consumer+Producer:

<beans>
	
	<bean id="broker" class="org.apache.activemq.broker.BrokerService" 
init-method="start" destroy-method="stop">
		<property name="persistent" value="false"/>
		<property name="transportConnectorURIs">
			<list>
				<value>tcp://localhost:5000</value>
			</list>
		</property>
	</bean>
	
	<bean id="transactionContextManager" 
class="org.jencks.factory.TransactionContextManagerFactoryBean"/>
	<bean id="userTransaction" 
class="org.jencks.factory.GeronimoTransactionManagerFactoryBean"/>
	<bean id="jtaTransactionManager" 
class="org.springframework.transaction.jta.JtaTransactionManager">
		<property name="userTransaction" ref="userTransaction" />
	</bean>
	
	<bean id="jmsResourceAdapter" 
class="org.apache.activemq.ra.ActiveMQResourceAdapter">
		<property name="serverUrl" value="tcp://localhost:5000"/>
	</bean>

	<bean id="jencks" class="org.jencks.JCAContainer">
		<property name="bootstrapContext">
			<bean class="org.jencks.factory.BootstrapContextFactoryBean">
				<property name="threadPoolSize" value="25"/>
			</bean>
		</property>
		<property name="resourceAdapter" ref="jmsResourceAdapter"/>
	</bean>

	<bean id="inboundConnector" class="org.jencks.JCAConnector">
		<property name="jcaContainer" ref="jencks" />
		<property name="activationSpec">
			<bean class="org.apache.activemq.ra.ActiveMQActivationSpec">
				<property name="destination" value="messages.input"/>
				<property name="destinationType" value="javax.jms.Queue"/>
			</bean>
		</property>
		<property name="transactionManager" ref="userTransaction"/>
		<property name="ref" value="echoBean"/>
	</bean>
	
	<bean id="echoBean" class="transaction.test.EchoBean">
		<property name="template">
			<bean class="org.springframework.jms.core.JmsTemplate">
			    <property name="connectionFactory">
			    	<bean 
class="org.springframework.jca.support.LocalConnectionFactoryBean">
						<property name="managedConnectionFactory">
							<bean 
class="org.apache.activemq.ra.ActiveMQManagedConnectionFactory">
								<property name="resourceAdapter" ref="jmsResourceAdapter"/>
							</bean>
						</property>
					</bean>
			    </property>
			</bean>
		</property>
		<property name="destination">
			<bean class="org.apache.activemq.command.ActiveMQQueue">
				<constructor-arg value="messages.output"/>
			</bean>
		</property>
	</bean>

</beans>

class transaction.test.EchoBean is:

public class EchoBean implements MessageListener {
	
	private Log log = LogFactory.getLog(getClass());
	
	private JmsTemplate template;
	private Destination destination;
	
	public void setDestination(Destination destination) {
		this.destination = destination;
	}
	
	public void setTemplate(JmsTemplate template) {
		this.template = template;
	}
	
	public void onMessage(final Message message) {
		log.debug("MESSAGE : "+message);
		template.send(destination, new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				return message;
			}			
		});
	}

}

So, it must get message from messages.input and put it to 
messages.output. On starting context and put message to messages.input 
with external application I got:

DEBUG org.apache.activemq.ra.ServerSessionPoolImpl 
      ServerSession requested.
DEBUG org.apache.activemq.ra.ServerSessionPoolImpl 
      Created a new session: ServerSessionImpl:0
DEBUG org.apache.activemq.ra.ServerSessionImpl:0 
      Starting run.
DEBUG org.apache.activemq.ra.ServerSessionImpl:0 
      Work accepted: 
javax.resource.spi.work.WorkEvent[source=org.apache.geronimo.connector.work.GeronimoWorkManager@138c63]
DEBUG org.apache.activemq.ra.ServerSessionImpl:0 
      Work started: javax.resource.spi.work.WorkEvent[source=Work 
:ServerSessionImpl:0]
DEBUG org.apache.activemq.ra.ServerSessionImpl:0 
      Running
DEBUG org.apache.activemq.ra.ServerSessionImpl:0 
      run loop start
DEBUG org.apache.activemq.TransactionContext 
      Start: 
[globalId=100000005748415420444f2057452043414c4c2049543f0000000000000000000000000000000000000,branchId=100000005748415420444f2057452043414c4c2049543f0000000000000000000000000000000000000]
DEBUG org.apache.activemq.TransactionContext 
      Started XA transaction: 
XID:1197822575:01000000000000005748415420444f2057452043414c4c2049543f00000000000000000000000000000000000000000000000000000000000000000000000000:01000000000000005748415420444f2057452043414c4c2049543f00000000000000000000000000000000000000000000000000000000000000000000000000
DEBUG org.jencks.XAEndpoint 
      Transaction started and resource enlisted
DEBUG transaction.test.EchoBean 
      MESSAGE : ActiveMQTextMessage {commandId = 5, responseRequired = 
true, messageId = 
ID:prokopiev.stc.donpac.ru-32907-1154439031254-0:0:1:1:1, 
originalDestination = null, originalTransactionId = null, producerId = 
ID:prokopiev.stc.donpac.ru-32907-1154439031254-0:0:1:1, destination = 
queue://messages.input, transactionId = null, expiration = 0, timestamp 
= 1154439031649, arrival = 0, correlationId = null, replyTo = null, 
persistent = true, type = null, priority = 4, groupID = null, 
groupSequence = 0, targetConsumerId = null, compressed = false, userID = 
null, content = org.apache.activeio.packet.ByteSequence@15575e0, 
marshalledProperties = null, dataStructure = null, redeliveryCounter = 
0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody 
= true, text = null}
DEBUG org.apache.activemq.ActiveMQSession 
      Sending message: ActiveMQTextMessage {commandId = 5, 
responseRequired = true, messageId = 
ID:prokopiev.stc.donpac.ru-60565-1154439019835-3:1:1:1:1, 
originalDestination = null, originalTransactionId = null, producerId = 
ID:prokopiev.stc.donpac.ru-60565-1154439019835-3:1:1:1, destination = 
queue://messages.output, transactionId = null, expiration = 0, timestamp 
= 1154439031934, arrival = 0, correlationId = null, replyTo = null, 
persistent = true, type = null, priority = 4, groupID = null, 
groupSequence = 0, targetConsumerId = null, compressed = false, userID = 
null, content = org.apache.activeio.packet.ByteSequence@15575e0, 
marshalledProperties = null, dataStructure = null, redeliveryCounter = 
0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody 
= true, text = null}
INFO  org.apache.activemq.broker.region.PrefetchSubscription 
      Could not correlate acknowledgment with dispatched message: 
MessageAck {commandId = 6, responseRequired = false, ackType = 2, 
consumerId = ID:prokopiev.stc.donpac.ru-60565-1154439019835-3:0:-1:2, 
firstMessageId = 
ID:prokopiev.stc.donpac.ru-60565-1154439019835-3:1:1:1:1, lastMessageId 
= ID:prokopiev.stc.donpac.ru-60565-1154439019835-3:1:1:1:1, destination 
= queue://messages.input, transactionId = 
XID:1197822575:01000000000000005748415420444f2057452043414c4c2049543f00000000000000000000000000000000000000000000000000000000000000000000000000:01000000000000005748415420444f2057452043414c4c2049543f00000000000000000000000000000000000000000000000000000000000000000000000000, 
messageCount = 1}
DEBUG org.apache.activemq.TransactionContext 
      End: 
[globalId=100000005748415420444f2057452043414c4c2049543f0000000000000000000000000000000000000,branchId=100000005748415420444f2057452043414c4c2049543f0000000000000000000000000000000000000]
DEBUG org.apache.activemq.ra.ActiveMQEndpointWorker 
      Reconnect cause:
javax.jms.JMSException: Invalid acknowledgment: MessageAck {commandId = 
6, responseRequired = false, ackType = 2, consumerId = 
ID:prokopiev.stc.donpac.ru-60565-1154439019835-3:0:-1:2, firstMessageId 
= ID:prokopiev.stc.donpac.ru-60565-1154439019835-3:1:1:1:1, 
lastMessageId = 
ID:prokopiev.stc.donpac.ru-60565-1154439019835-3:1:1:1:1, destination = 
queue://messages.input, transactionId = 
XID:1197822575:01000000000000005748415420444f2057452043414c4c2049543f00000000000000000000000000000000000000000000000000000000000000000000000000:01000000000000005748415420444f2057452043414c4c2049543f00000000000000000000000000000000000000000000000000000000000000000000000000, 
messageCount = 1}
	at 
org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:181)
	at 
org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:247)
	at 
org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:377)
	at 
org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:176)
	at 
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:69)
	at 
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:69)
	at 
org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:83)
	at 
org.apache.activemq.broker.AbstractConnection.processMessageAck(AbstractConnection.java:382)
	at org.apache.activemq.command.MessageAck.visit(MessageAck.java:178)
	at 
org.apache.activemq.broker.AbstractConnection.service(AbstractConnection.java:226)
	at 
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:62)
	at 
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:91)
	at 
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:63)
	at 
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:117)
	at 
org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:122)
	at 
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:87)
	at 
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:127)
	at java.lang.Thread.run(Thread.java:595)
DEBUG org.apache.activemq.TransactionContext 
      Ended XA transaction: 
XID:1197822575:01000000000000005748415420444f2057452043414c4c2049543f00000000000000000000000000000000000000000000000000000000000000000000000000:01000000000000005748415420444f2057452043414c4c2049543f00000000000000000000000000000000000000000000000000000000000000000000000000
DEBUG org.apache.activemq.TransactionContext 
      End: 
[globalId=100000005748415420444f2057452043414c4c2049543f0000000000000000000000000000000000000,branchId=100000005748415420444f2057452043414c4c2049543f0000000000000000000000000000000000000]
DEBUG org.apache.activemq.TransactionContext 
      Commit: 
[globalId=100000005748415420444f2057452043414c4c2049543f0000000000000000000000000000000000000,branchId=100000005748415420444f2057452043414c4c2049543f0000000000000000000000000000000000000]
DEBUG org.apache.activemq.transaction.XATransaction 
      XA Transaction commit: 
XID:1197822575:01000000000000005748415420444f2057452043414c4c2049543f00000000000000000000000000000000000000000000000000000000000000000000000000:01000000000000005748415420444f2057452043414c4c2049543f00000000000000000000000000000000000000000000000000000000000000000000000000
DEBUG org.jencks.XAEndpoint 
      Transaction committed
DEBUG org.apache.activemq.ra.ServerSessionImpl:0 
      run loop end
DEBUG org.apache.activemq.ra.ServerSessionPoolImpl 
      Session returned to pool: ServerSessionImpl:0
DEBUG org.apache.activemq.ra.ServerSessionImpl:0 
      Run finished
DEBUG org.apache.activemq.ra.ServerSessionImpl:0 
      Work completed: javax.resource.spi.work.WorkEvent[source=Work 
:ServerSessionImpl:0]
DEBUG org.apache.activemq.ra.ServerSessionPoolImpl 
      ServerSession requested.
DEBUG org.apache.activemq.ra.ServerSessionPoolImpl 
      Using idle session: ServerSessionImpl:0
DEBUG org.apache.activemq.ra.ServerSessionImpl:0 
      Starting run.
DEBUG org.apache.activemq.ra.ServerSessionImpl:0 
      Work accepted: 
javax.resource.spi.work.WorkEvent[source=org.apache.geronimo.connector.work.GeronimoWorkManager@138c63]
DEBUG org.apache.activemq.ra.ServerSessionImpl:0 
      Work started: javax.resource.spi.work.WorkEvent[source=Work 
:ServerSessionImpl:0]
DEBUG org.apache.activemq.ra.ServerSessionImpl:0 
      Running
DEBUG org.apache.activemq.ra.ServerSessionImpl:0 
      run loop start
DEBUG org.apache.activemq.ra.ServerSessionImpl:0 
      run loop end
DEBUG org.apache.activemq.ra.ServerSessionPoolImpl 
      Session returned to pool: ServerSessionImpl:0
DEBUG org.apache.activemq.ra.ServerSessionImpl:0 
      Run finished
DEBUG org.apache.activemq.ra.ServerSessionImpl:0 
      Work completed: javax.resource.spi.work.WorkEvent[source=Work 
:ServerSessionImpl:0]

Next I run jconsole go to org.apache.activemq/localhost/Queues/ and see 
that messages.input and messages.output has attribute QueueSize set to 
1. So, message was not deleted from messages.input but placed to 
messages.output. I need to place message only to one location and tried 
to use transactions to guarantee it.

What's wrong?

--
Thanks,
Eugene Prokopiev


Re: Jencks : transactional consuming and producing message problem

Posted by Eugene Prokopiev <pr...@stc.donpac.ru>.
Sorry, my question is: why message was not deleted from messages.input 
but placed to messages.output on executing wrong code? Can I rollback 
message consuming from queue on sending error?


Re: Jencks : transactional consuming and producing message problem

Posted by Eugene Prokopiev <pr...@stc.donpac.ru>.
>     public void onMessage(final Message message) {
>         log.debug("MESSAGE : "+message);
>         template.send(destination, new MessageCreator() {
>             public Message createMessage(Session session) throws 
> JMSException {
>                 return message;
>             }           
>         });
>     }

I replaced this code with:

	public void onMessage(final Message message) {
		log.debug("MESSAGE : "+message);
		if (message instanceof TextMessage) {
			template.send(destination, new MessageCreator() {
				public Message createMessage(Session session) throws JMSException {
					return session.createTextMessage(((TextMessage)message).getText());
				}			
			});
		}		
	}

But the main problem was stay. Why message was not deleted from 
messages.input but placed to messages.output?

-- 
Thanks,
Eugene Prokopiev