You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Eugene Prokopiev <pr...@stc.donpac.ru> on 2006/08/01 15:45:19 UTC
Jencks : transactional consuming and producing message problem
Hi,
I use this context.xml to run AMQ+Jencks+Consumer+Producer:
<beans>
<bean id="broker" class="org.apache.activemq.broker.BrokerService"
init-method="start" destroy-method="stop">
<property name="persistent" value="false"/>
<property name="transportConnectorURIs">
<list>
<value>tcp://localhost:5000</value>
</list>
</property>
</bean>
<bean id="transactionContextManager"
class="org.jencks.factory.TransactionContextManagerFactoryBean"/>
<bean id="userTransaction"
class="org.jencks.factory.GeronimoTransactionManagerFactoryBean"/>
<bean id="jtaTransactionManager"
class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="userTransaction" ref="userTransaction" />
</bean>
<bean id="jmsResourceAdapter"
class="org.apache.activemq.ra.ActiveMQResourceAdapter">
<property name="serverUrl" value="tcp://localhost:5000"/>
</bean>
<bean id="jencks" class="org.jencks.JCAContainer">
<property name="bootstrapContext">
<bean class="org.jencks.factory.BootstrapContextFactoryBean">
<property name="threadPoolSize" value="25"/>
</bean>
</property>
<property name="resourceAdapter" ref="jmsResourceAdapter"/>
</bean>
<bean id="inboundConnector" class="org.jencks.JCAConnector">
<property name="jcaContainer" ref="jencks" />
<property name="activationSpec">
<bean class="org.apache.activemq.ra.ActiveMQActivationSpec">
<property name="destination" value="messages.input"/>
<property name="destinationType" value="javax.jms.Queue"/>
</bean>
</property>
<property name="transactionManager" ref="userTransaction"/>
<property name="ref" value="echoBean"/>
</bean>
<bean id="echoBean" class="transaction.test.EchoBean">
<property name="template">
<bean class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<bean
class="org.springframework.jca.support.LocalConnectionFactoryBean">
<property name="managedConnectionFactory">
<bean
class="org.apache.activemq.ra.ActiveMQManagedConnectionFactory">
<property name="resourceAdapter" ref="jmsResourceAdapter"/>
</bean>
</property>
</bean>
</property>
</bean>
</property>
<property name="destination">
<bean class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="messages.output"/>
</bean>
</property>
</bean>
</beans>
class transaction.test.EchoBean is:
public class EchoBean implements MessageListener {
private Log log = LogFactory.getLog(getClass());
private JmsTemplate template;
private Destination destination;
public void setDestination(Destination destination) {
this.destination = destination;
}
public void setTemplate(JmsTemplate template) {
this.template = template;
}
public void onMessage(final Message message) {
log.debug("MESSAGE : "+message);
template.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return message;
}
});
}
}
So, it must get message from messages.input and put it to
messages.output. On starting context and put message to messages.input
with external application I got:
DEBUG org.apache.activemq.ra.ServerSessionPoolImpl
ServerSession requested.
DEBUG org.apache.activemq.ra.ServerSessionPoolImpl
Created a new session: ServerSessionImpl:0
DEBUG org.apache.activemq.ra.ServerSessionImpl:0
Starting run.
DEBUG org.apache.activemq.ra.ServerSessionImpl:0
Work accepted:
javax.resource.spi.work.WorkEvent[source=org.apache.geronimo.connector.work.GeronimoWorkManager@138c63]
DEBUG org.apache.activemq.ra.ServerSessionImpl:0
Work started: javax.resource.spi.work.WorkEvent[source=Work
:ServerSessionImpl:0]
DEBUG org.apache.activemq.ra.ServerSessionImpl:0
Running
DEBUG org.apache.activemq.ra.ServerSessionImpl:0
run loop start
DEBUG org.apache.activemq.TransactionContext
Start:
[globalId=100000005748415420444f2057452043414c4c2049543f0000000000000000000000000000000000000,branchId=100000005748415420444f2057452043414c4c2049543f0000000000000000000000000000000000000]
DEBUG org.apache.activemq.TransactionContext
Started XA transaction:
XID:1197822575:01000000000000005748415420444f2057452043414c4c2049543f00000000000000000000000000000000000000000000000000000000000000000000000000:01000000000000005748415420444f2057452043414c4c2049543f00000000000000000000000000000000000000000000000000000000000000000000000000
DEBUG org.jencks.XAEndpoint
Transaction started and resource enlisted
DEBUG transaction.test.EchoBean
MESSAGE : ActiveMQTextMessage {commandId = 5, responseRequired =
true, messageId =
ID:prokopiev.stc.donpac.ru-32907-1154439031254-0:0:1:1:1,
originalDestination = null, originalTransactionId = null, producerId =
ID:prokopiev.stc.donpac.ru-32907-1154439031254-0:0:1:1, destination =
queue://messages.input, transactionId = null, expiration = 0, timestamp
= 1154439031649, arrival = 0, correlationId = null, replyTo = null,
persistent = true, type = null, priority = 4, groupID = null,
groupSequence = 0, targetConsumerId = null, compressed = false, userID =
null, content = org.apache.activeio.packet.ByteSequence@15575e0,
marshalledProperties = null, dataStructure = null, redeliveryCounter =
0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody
= true, text = null}
DEBUG org.apache.activemq.ActiveMQSession
Sending message: ActiveMQTextMessage {commandId = 5,
responseRequired = true, messageId =
ID:prokopiev.stc.donpac.ru-60565-1154439019835-3:1:1:1:1,
originalDestination = null, originalTransactionId = null, producerId =
ID:prokopiev.stc.donpac.ru-60565-1154439019835-3:1:1:1, destination =
queue://messages.output, transactionId = null, expiration = 0, timestamp
= 1154439031934, arrival = 0, correlationId = null, replyTo = null,
persistent = true, type = null, priority = 4, groupID = null,
groupSequence = 0, targetConsumerId = null, compressed = false, userID =
null, content = org.apache.activeio.packet.ByteSequence@15575e0,
marshalledProperties = null, dataStructure = null, redeliveryCounter =
0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody
= true, text = null}
INFO org.apache.activemq.broker.region.PrefetchSubscription
Could not correlate acknowledgment with dispatched message:
MessageAck {commandId = 6, responseRequired = false, ackType = 2,
consumerId = ID:prokopiev.stc.donpac.ru-60565-1154439019835-3:0:-1:2,
firstMessageId =
ID:prokopiev.stc.donpac.ru-60565-1154439019835-3:1:1:1:1, lastMessageId
= ID:prokopiev.stc.donpac.ru-60565-1154439019835-3:1:1:1:1, destination
= queue://messages.input, transactionId =
XID:1197822575:01000000000000005748415420444f2057452043414c4c2049543f00000000000000000000000000000000000000000000000000000000000000000000000000:01000000000000005748415420444f2057452043414c4c2049543f00000000000000000000000000000000000000000000000000000000000000000000000000,
messageCount = 1}
DEBUG org.apache.activemq.TransactionContext
End:
[globalId=100000005748415420444f2057452043414c4c2049543f0000000000000000000000000000000000000,branchId=100000005748415420444f2057452043414c4c2049543f0000000000000000000000000000000000000]
DEBUG org.apache.activemq.ra.ActiveMQEndpointWorker
Reconnect cause:
javax.jms.JMSException: Invalid acknowledgment: MessageAck {commandId =
6, responseRequired = false, ackType = 2, consumerId =
ID:prokopiev.stc.donpac.ru-60565-1154439019835-3:0:-1:2, firstMessageId
= ID:prokopiev.stc.donpac.ru-60565-1154439019835-3:1:1:1:1,
lastMessageId =
ID:prokopiev.stc.donpac.ru-60565-1154439019835-3:1:1:1:1, destination =
queue://messages.input, transactionId =
XID:1197822575:01000000000000005748415420444f2057452043414c4c2049543f00000000000000000000000000000000000000000000000000000000000000000000000000:01000000000000005748415420444f2057452043414c4c2049543f00000000000000000000000000000000000000000000000000000000000000000000000000,
messageCount = 1}
at
org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:181)
at
org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:247)
at
org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:377)
at
org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:176)
at
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:69)
at
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:69)
at
org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:83)
at
org.apache.activemq.broker.AbstractConnection.processMessageAck(AbstractConnection.java:382)
at org.apache.activemq.command.MessageAck.visit(MessageAck.java:178)
at
org.apache.activemq.broker.AbstractConnection.service(AbstractConnection.java:226)
at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:62)
at
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:91)
at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:63)
at
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:117)
at
org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:122)
at
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:87)
at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:127)
at java.lang.Thread.run(Thread.java:595)
DEBUG org.apache.activemq.TransactionContext
Ended XA transaction:
XID:1197822575:01000000000000005748415420444f2057452043414c4c2049543f00000000000000000000000000000000000000000000000000000000000000000000000000:01000000000000005748415420444f2057452043414c4c2049543f00000000000000000000000000000000000000000000000000000000000000000000000000
DEBUG org.apache.activemq.TransactionContext
End:
[globalId=100000005748415420444f2057452043414c4c2049543f0000000000000000000000000000000000000,branchId=100000005748415420444f2057452043414c4c2049543f0000000000000000000000000000000000000]
DEBUG org.apache.activemq.TransactionContext
Commit:
[globalId=100000005748415420444f2057452043414c4c2049543f0000000000000000000000000000000000000,branchId=100000005748415420444f2057452043414c4c2049543f0000000000000000000000000000000000000]
DEBUG org.apache.activemq.transaction.XATransaction
XA Transaction commit:
XID:1197822575:01000000000000005748415420444f2057452043414c4c2049543f00000000000000000000000000000000000000000000000000000000000000000000000000:01000000000000005748415420444f2057452043414c4c2049543f00000000000000000000000000000000000000000000000000000000000000000000000000
DEBUG org.jencks.XAEndpoint
Transaction committed
DEBUG org.apache.activemq.ra.ServerSessionImpl:0
run loop end
DEBUG org.apache.activemq.ra.ServerSessionPoolImpl
Session returned to pool: ServerSessionImpl:0
DEBUG org.apache.activemq.ra.ServerSessionImpl:0
Run finished
DEBUG org.apache.activemq.ra.ServerSessionImpl:0
Work completed: javax.resource.spi.work.WorkEvent[source=Work
:ServerSessionImpl:0]
DEBUG org.apache.activemq.ra.ServerSessionPoolImpl
ServerSession requested.
DEBUG org.apache.activemq.ra.ServerSessionPoolImpl
Using idle session: ServerSessionImpl:0
DEBUG org.apache.activemq.ra.ServerSessionImpl:0
Starting run.
DEBUG org.apache.activemq.ra.ServerSessionImpl:0
Work accepted:
javax.resource.spi.work.WorkEvent[source=org.apache.geronimo.connector.work.GeronimoWorkManager@138c63]
DEBUG org.apache.activemq.ra.ServerSessionImpl:0
Work started: javax.resource.spi.work.WorkEvent[source=Work
:ServerSessionImpl:0]
DEBUG org.apache.activemq.ra.ServerSessionImpl:0
Running
DEBUG org.apache.activemq.ra.ServerSessionImpl:0
run loop start
DEBUG org.apache.activemq.ra.ServerSessionImpl:0
run loop end
DEBUG org.apache.activemq.ra.ServerSessionPoolImpl
Session returned to pool: ServerSessionImpl:0
DEBUG org.apache.activemq.ra.ServerSessionImpl:0
Run finished
DEBUG org.apache.activemq.ra.ServerSessionImpl:0
Work completed: javax.resource.spi.work.WorkEvent[source=Work
:ServerSessionImpl:0]
Next I run jconsole go to org.apache.activemq/localhost/Queues/ and see
that messages.input and messages.output has attribute QueueSize set to
1. So, message was not deleted from messages.input but placed to
messages.output. I need to place message only to one location and tried
to use transactions to guarantee it.
What's wrong?
--
Thanks,
Eugene Prokopiev
Re: Jencks : transactional consuming and producing message problem
Posted by Eugene Prokopiev <pr...@stc.donpac.ru>.
Sorry, my question is: why message was not deleted from messages.input
but placed to messages.output on executing wrong code? Can I rollback
message consuming from queue on sending error?
Re: Jencks : transactional consuming and producing message problem
Posted by Eugene Prokopiev <pr...@stc.donpac.ru>.
> public void onMessage(final Message message) {
> log.debug("MESSAGE : "+message);
> template.send(destination, new MessageCreator() {
> public Message createMessage(Session session) throws
> JMSException {
> return message;
> }
> });
> }
I replaced this code with:
public void onMessage(final Message message) {
log.debug("MESSAGE : "+message);
if (message instanceof TextMessage) {
template.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(((TextMessage)message).getText());
}
});
}
}
But the main problem was stay. Why message was not deleted from
messages.input but placed to messages.output?
--
Thanks,
Eugene Prokopiev