You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Jarosław Pałka <jp...@gmail.com> on 2009/03/26 10:01:26 UTC

Embedded broker hangs after restart

Hi,

I have strange behavior of embedded ActiveMQ 5.2 broker.

I have an web application running under Jetty 1.6.14 (with Spring 2.5.6). I
have embedded broker that is connected to ActiveMQ server through JMS
connector.I have to topics INCOMING and OUTGOING that retrieve and forward
messages to ActiveMQ server.
I use XBeanBrokerService to configure embedded broker. The configuration of
embedded broker is in attachment.

Everything runs great until restart of Jetty, once restarted web application
is able to receive messages but it doesn't send messages. I got following
exception logged in ActiveMQ server:

ERROR RecoveryListenerAdapter        - Message id
ID:ubot2-48486-1237589773318-0:0:2291:1:1 could not be recovered from the
data store - already dispatched
ERROR Service                        - Async error occurred:
javax.jms.JMSException: Could not correlate acknowledgment with dispatched
message: MessageAck {commandId = 8, responseRequired = false, ackType = 0,
consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, firstMessageId =
ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
ID:ubot2-52020-1238055648089-0:0:2:1:1, destination = topic://IN.ltbl,
transactionId = null, messageCount = 0}
javax.jms.JMSException: Could not correlate acknowledgment with dispatched
message: MessageAck {commandId = 8, responseRequired = false, ackType = 0,
consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, firstMessageId =
ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
ID:ubot2-52020-1238055648089-0:0:2:1:1, destination = topic://IN.ltbl,
transactionId = null, messageCount = 0}
        at
org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:304)
        at
org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:373)
        at
org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462)
        at
org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
        at
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
        at
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
        at
org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
        at
org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
        at org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
        at
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
        at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
        at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
        at
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
        at
org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
        at
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
        at
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
        at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
        at java.lang.Thread.run(Thread.java:619)

and similar message at web application side during shutdown:

2009-03-26 09:23:10.975::INFO:  Shutdown hook executing
2009-03-26 09:23:10.975::INFO:  Graceful shutdown
SelectChannelConnector@0.0.0.0:8981
140421 [ActiveMQ ShutdownHook] INFO
org.apache.activemq.broker.BrokerService  - ActiveMQ Message Broker
(localhost, ID:ubot2-45498-1238055651187-0:0) is shutting down
140421 [ActiveMQ ShutdownHook] DEBUG
org.apache.activemq.broker.BrokerService  - Caught exception, must be
shutting down: java.lang.IllegalStateException: Shutdown in progress
2009-03-26 09:23:10.987::INFO:  Graceful shutdown
org.mortbay.jetty.webapp.WebAppContext@173831b
{/test-jndi,/home/jpalka/jetty-6.1.14/contexts/test-jndi.d}
2009-03-26 09:23:10.987::INFO:  Graceful shutdown
org.mortbay.jetty.webapp.WebAppContext@1abab88
{/,/home/jpalka/jetty-6.1.14/webapps/test}
2009-03-26 09:23:10.987::INFO:  Graceful shutdown
org.mortbay.jetty.handler.ContextHandler@b1b4c3
{/javadoc,file:/home/jpalka/jetty-6.1.14/javadoc/}
2009-03-26 09:23:10.987::INFO:  Graceful shutdown
org.mortbay.jetty.webapp.WebAppContext@5e5f92
{/umm-gateway,jar:file:/home/jpalka/jetty-6.1.14/webapps/umm-gateway.war!/}
2009-03-26 09:23:10.987::INFO:  Graceful shutdown
org.mortbay.jetty.webapp.WebAppContext@1d4ab0e
{/lottoblog,jar:file:/home/jpalka/jetty-6.1.14/webapps/lottoblog.war!/}
2009-03-26 09:23:10.987::INFO:  Graceful shutdown
org.mortbay.jetty.webapp.WebAppContext@12a585c
{/test-jaas,file:/home/jpalka/jetty-6.1.14/webapps/test-jaas/}
2009-03-26 09:23:10.987::INFO:  Graceful shutdown
org.mortbay.jetty.webapp.WebAppContext@10f3801
{/umm-portal,jar:file:/home/jpalka/jetty-6.1.14/webapps/umm-portal.war!/}
2009-03-26 09:23:10.987::INFO:  Graceful shutdown
org.mortbay.jetty.webapp.WebAppContext@2606b8
{/cometd,jar:file:/home/jpalka/jetty-6.1.14/webapps/cometd.war!/}
140444 [ActiveMQ ShutdownHook] INFO
org.apache.activemq.network.jms.JmsConnector  - JMS Connector Connector:0
Stopped
140447 [ActiveMQ ShutdownHook] INFO
org.apache.activemq.network.jms.JmsConnector  - JMS Connector Connector:1
Stopped
140449 [ActiveMQ Transport Stopper: vm://localhost#0] DEBUG
org.apache.activemq.broker.TransportConnection  - Stopping connection:
vm://localhost#0
140452 [VMTransport] DEBUG org.apache.activemq.store.amq.AMQMessageStore  -
flush starting ...
140450 [ActiveMQ Connection Worker: tcp://localhost:61616] DEBUG
org.apache.activemq.ActiveMQConnection  - Async exception with no exception
listener: javax.jms.JMSException: Could not correlate acknowledgment with
dispatched message: MessageAck {commandId = 8, responseRequired = false,
ackType = 0, consumerId = ID:ubot2-45498-1238055651187-2:0:2:1,
firstMessageId = ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
ID:ubot2-52020-1238055648089-0:0:2:1:1, destination = topic://IN.ltbl,
transactionId = null, messageCount = 0}
javax.jms.JMSException: Could not correlate acknowledgment with dispatched
message: MessageAck {commandId = 8, responseRequired = false, ackType = 0,
consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, firstMessageId =
ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
ID:ubot2-52020-1238055648089-0:0:2:1:1, destination = topic://IN.ltbl,
transactionId = null, messageCount = 0}
        at
org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:304)
        at
org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:373)
        at
org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462)
        at
org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
        at
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
        at
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
        at
org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
        at
org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
        at org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
        at
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
        at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
        at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
        at
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
        at
org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
        at
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
        at
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
        at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
        at java.lang.Thread.run(Thread.java:619)
140465 [VMTransport] DEBUG org.apache.activemq.ActiveMQConnection  - Async
exception with no exception listener:
org.apache.activemq.transport.TransportDisposedIOException: Peer
(vm://localhost#1) disposed.
org.apache.activemq.transport.TransportDisposedIOException: Peer
(vm://localhost#1) disposed.
        at
org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:203)
        at
org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122)
        at
org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
        at java.lang.Thread.run(Thread.java:619)


The only workaround I found is to remove activemq-data directory at web
application site before starting Jetty again.

Can you tell me what I'm doing wrong? I have not seen such behavior with
ActiveMQ 5.1.

Regards,
Jarek

Re: Embedded broker hangs after restart

Posted by Jarosław Pałka <jp...@gmail.com>.
Gary,

I have attached the test case as you asked.

Is there any other way I can help to resolve this issue?

Regards,
Jarek

W dniu 26 marca 2009 23:59 użytkownik Gary Tully <ga...@gmail.com>napisał:

> Hi Jarek,
> great that it reproduced easily. That test case should be fine.
> I have created a jira issue to track this:
> https://issues.apache.org/activemq/browse/AMQ-2185
>
> Could you attach your test case to the issue (file attach) and grant the
> ASF
> license to your test. (tick the check box)
>
> thanks,
> Gary.
>
> 2009/3/26 Jarosław Pałka <jp...@gmail.com>
>
> > Gary,
> >
> > It was not so hard to reproduce this exception as it looked to me in the
> > beginning. Maybe it is not the best test case I have every written in my
> > life, but at least I get the same error message.
> >
> > package com.assembla.client.impl.tests;
> >
> > import java.io.File;
> > import java.net.URISyntaxException;
> > import java.net.UnknownHostException;
> >
> > import javax.jms.DeliveryMode;
> > import javax.jms.Message;
> > import javax.jms.MessageListener;
> > import javax.jms.Session;
> > import javax.jms.TextMessage;
> > import javax.jms.TopicConnection;
> > import javax.jms.TopicConnectionFactory;
> > import javax.jms.TopicPublisher;
> > import javax.jms.TopicSession;
> > import javax.jms.TopicSubscriber;
> >
> > import junit.framework.TestCase;
> >
> > import org.apache.activemq.ActiveMQConnectionFactory;
> > import org.apache.activemq.broker.BrokerService;
> > import org.apache.activemq.command.ActiveMQTopic;
> > import org.apache.activemq.network.jms.JmsTopicConnector;
> > import org.apache.activemq.network.jms.OutboundTopicBridge;
> > import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
> >
> > public class ActiveMQJMSConnectorTest extends TestCase {
> >     private static final String OUTGOING_LOCAL_TOPIC = "OUTGOING";
> >     private BrokerService serverBroker;
> >    private BrokerService clientBroker;
> >
> >    @Override
> >    protected void setUp() throws Exception {
> >
> >        serverBroker = new BrokerService();
> >        serverBroker.setUseJmx(false);
> >        serverBroker.setPersistent(true);
> >        serverBroker.addConnector("tcp://localhost:61616");
> >        serverBroker.setDataDirectory("server-broker");
> >
> >        serverBroker.start();
> >
> >        clientBroker = new BrokerService();
> >        clientBroker.setUseJmx(false);
> >        clientBroker.setPersistent(true);
> >
> >        AMQPersistenceAdapterFactory persistenceFactory = new
> > AMQPersistenceAdapterFactory();
> >        persistenceFactory.setDataDirectory(new File("client-broker"));
> >        persistenceFactory.setPersistentIndex(false);
> >
> >        clientBroker.setPersistenceFactory(persistenceFactory);
> >        clientBroker.addJmsConnector(outgoingTopicConnector());
> >        clientBroker.addConnector("vm://localhost");
> >        clientBroker.start();
> >
> >    }
> >
> >    private JmsTopicConnector outgoingTopicConnector()
> >             throws UnknownHostException, URISyntaxException {
> >        JmsTopicConnector topicConnector = new JmsTopicConnector();
> >        topicConnector
> >
> > .setOutboundTopicConnectionFactory(outboundTopicConnectionFactory());
> >        topicConnector.setLocalClientId("local");
> >        topicConnector
> >                .setOutboundTopicBridges(new OutboundTopicBridge[] {
> > outboundTopicBridge() });
> >        return topicConnector;
> >    }
> >
> >     private TopicConnectionFactory outboundTopicConnectionFactory()
> >             throws UnknownHostException, URISyntaxException {
> >        ActiveMQConnectionFactory activeMQConnectionFactory = new
> > ActiveMQConnectionFactory(
> >                getAMQBrokerURL());
> >        activeMQConnectionFactory.setClientID(getClientID());
> >        return activeMQConnectionFactory;
> >    }
> >
> >     private OutboundTopicBridge outboundTopicBridge() throws
> > URISyntaxException {
> >        OutboundTopicBridge topicBridge = new OutboundTopicBridge();
> >        topicBridge.setLocalTopicName(OUTGOING_LOCAL_TOPIC);
> >        topicBridge.setOutboundTopicName(getOutboundTopicName());
> >        topicBridge.setConsumerName(getConsumerName());
> >        return topicBridge;
> >    }
> >
> >     private String getClientID() {
> >        return "embedded-client";
> >    }
> >
> >    private String getAMQBrokerURL() {
> >        return "tcp://localhost:61616";
> >    }
> >
> >    private String getAppID() {
> >        return "subtopic";
> >    }
> >
> >    private String getOutboundTopicName() throws URISyntaxException {
> >        String topicName = "OUT." + getAppID();
> >        return topicName;
> >    }
> >
> >    private String getConsumerName() throws URISyntaxException {
> >        String consumerName = getAppID();
> >        return consumerName;
> >    }
> >
> >    public void testMessage() throws Exception {
> >
> >        ActiveMQConnectionFactory serverFcty = new
> > ActiveMQConnectionFactory(
> >                "tcp://localhost:61616");
> >        TopicConnection serverCnn = serverFcty.createTopicConnection();
> >        serverCnn.setClientID("serverCnn");
> >        serverCnn.start();
> >
> >        TopicSession serverSession = serverCnn.createTopicSession(false,
> > Session.AUTO_ACKNOWLEDGE);
> >        TopicSubscriber serverSubscriber =
> > serverSession.createDurableSubscriber(new
> > ActiveMQTopic(getOutboundTopicName()), "serverCnn");
> >        serverSubscriber.setMessageListener(new MessageListener(){
> >
> >            @Override
> >            public void onMessage(Message arg0) {
> >                System.out.println(arg0);
> >            }});
> >
> >        ActiveMQConnectionFactory clientFcty = new
> > ActiveMQConnectionFactory("vm://localhost");
> >        TopicConnection clientCnn = clientFcty.createTopicConnection();
> >        clientCnn.start();
> >
> >        TopicSession clientSession = clientCnn.createTopicSession(false,
> > Session.AUTO_ACKNOWLEDGE);
> >
> >        TextMessage msg = clientSession.createTextMessage("<message/>");
> >        msg.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
> >        TopicPublisher publisher = clientSession.createPublisher(new
> > ActiveMQTopic(OUTGOING_LOCAL_TOPIC));
> >        publisher.send(msg);
> >
> >        Thread.sleep(5000);
> >    }
> >
> >    @Override
> >    protected void tearDown() throws Exception {
> >        clientBroker.stop();
> >
> >        serverBroker.start();
> >    }
> >
> > }
> >
> > Regards,
> > Jarek
> >
> > W dniu 26 marca 2009 18:38 użytkownik Gary Tully <gary.tully@gmail.com
> > >napisał:
> >
> > > Jarek,
> > > good news, but that exception is ugly. It would be smashing if you
> could
> > > reproduce in a Junit tests case using just the JMS apis, but it may be
> > > difficult to reproduce.
> > >
> > > I think I need to turn the queue test case from AMQ-2149 into a Topic
> > > scenario. That may reproduce.
> > >
> > > 2009/3/26 Jarosław Pałka <jp...@gmail.com>
> > >
> > > > Gary,
> > > >
> > > > I did recommended changes. On the embedded broker side everything
> looks
> > > > fine
> > > > now. I restarted Jetty (embedded broker) many times and it worked.
> > > >
> > > > What is interesting I started to see some exceptions in my other
> > > > application
> > > > that is listening on the same ActiveMQ server. I think below schema
> > will
> > > > help to explain my topology,
> > > >
> > > > client ---> topic://OUTGOING (embedded broker with JMS connector) -->
> > > > topic://OUT.subtopic (ActiveMQ server) --> Mule ESB
> > > >
> > > > When message is sent from client to MuleESB, after implementing your
> > > > changes
> > > > I started to see follwoing message:
> > > >
> > > > 19761 [ActiveMQ Session Task] DEBUG
> > > > org.mule.transport.jms.MultiConsumerJmsMessageReceiver  - Message
> > > Received
> > > > from: jms://topic:OUT.*
> > > > 19761 [ActiveMQ Session Task] DEBUG org.mule.DefaultMuleSession  -
> > There
> > > is
> > > > no session id on the request using key: ID. Generating new session
> id:
> > > > 575fc82b-1a0f-11de-8ce7-c7a955038e40
> > > > 19762 [ActiveMQ Session Task] DEBUG org.mule.DefaultMuleSession  -
> > > > dispatching event to service: service.que-outgoing, event is:
> > MuleEvent:
> > > > 575fc82c-1a0f-11de-8ce7-c7a955038e40, sync=false, stop processing=f
> > > > alse, DefaultInboundEndpoint{endpointUri=jms://topic:OUT.*,
> > > > connector=ActiveMQJmsConnector{this=ad97f5, started=true,
> > > initialised=true,
> > > > name='jmsConnector', disposed=false,
> numberOfConcurrentTransactedReceiv
> > > > ers=4, createMultipleTransactedReceivers=true, connected=true,
> > > > supportedProtocols=[jms], serviceOverrides=null},
> > > > transformer=[JMSMessageToObject{this=170b819,
> > name='JMSMessageToObject',
> > > > ignoreBadInput=false,
> > > >  returnClass=class java.lang.Object, sourceTypes=[interface
> > > > javax.jms.Message, interface javax.jms.TextMessage, interface
> > > > javax.jms.ObjectMessage, interface javax.jms.BytesMessage, interface
> > > > javax.jms.MapMes
> > > > sage, interface javax.jms.StreamMessage]},
> > > XML2UMMTransformer{this=146ad8b,
> > > > name='xml2umm', ignoreBadInput=false, returnClass=class
> > java.lang.Object,
> > > > sourceTypes=[]}, ExtractHeadersTransformer{this=12b9f14,
> > > > name='extract-headers', ignoreBadInput=false, returnClass=class
> > > > java.lang.Object, sourceTypes=[]}], name='endpoint.jms.OUT',
> > > > properties={durableName=cozaciota},
> > > > transactionConfig=Transaction{factory=null, ac
> > > > tion=NEVER, timeout=0}, filter=null, deleteUnacceptedMessages=false,
> > > > securityFilter=null, synchronous=false, initialState=started,
> > > > remoteSync=false, remoteSyncTimeout=10000, endpointEncoding=UTF-8}
> > > > 10459 [ActiveMQ Connection Worker: vm://localhost#2] DEBUG
> > > > org.apache.activemq.ActiveMQConnection  - Async exception with no
> > > exception
> > > > listener: java.lang.NullPointerException
> > > > java.lang.NullPointerException
> > > >        at
> > > >
> > > >
> > >
> >
> org.apache.activemq.kaha.impl.index.VMIndexLinkedList.remove(VMIndexLinkedList.java:265)
> > > >        at
> > > >
> > > >
> > >
> >
> org.apache.activemq.kaha.impl.container.ListContainerImpl.remove(ListContainerImpl.java:314)
> > > >        at
> > > >
> > > >
> > >
> >
> org.apache.activemq.kaha.impl.container.ListContainerImpl.remove(ListContainerImpl.java:695)
> > > >        at
> > > >
> > > >
> > >
> >
> org.apache.activemq.store.kahadaptor.KahaTopicReferenceStore.acknowledgeReference(KahaTopicReferenceStore.java:148)
> > > >        at
> > > >
> > > >
> > >
> >
> org.apache.activemq.store.amq.AMQTopicMessageStore.acknowledge(AMQTopicMessageStore.java:163)
> > > >        at
> > > >
> > > >
> > >
> >
> org.apache.activemq.store.amq.AMQTopicMessageStore.acknowledge(AMQTopicMessageStore.java:91)
> > > >        at
> > > > org.apache.activemq.broker.region.Topic.acknowledge(Topic.java:447)
> > > >        at
> > > >
> > > >
> > >
> >
> org.apache.activemq.broker.region.DurableTopicSubscription.acknowledge(DurableTopicSubscription.java:223)
> > > >        at
> > > >
> > > >
> > >
> >
> org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:238)
> > > >         at
> > > >
> > > >
> > >
> >
> org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:373)
> > > >        at
> > > >
> > > >
> > >
> >
> org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462)
> > > >        at
> > > >
> > > >
> > >
> >
> org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
> > > >        at
> > > >
> > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> > > >        at
> > > >
> > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> > > >        at
> > > >
> > > >
> > >
> >
> org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
> > > >        at
> > > >
> > > >
> > >
> >
> org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
> > > >        at
> > > org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
> > > >        at
> > > >
> > > >
> > >
> >
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
> > > >        at
> > > >
> > > >
> > >
> >
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
> > > >        at
> > > >
> > > >
> > >
> >
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:104)
> > > >         at
> > > >
> > > >
> > >
> >
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
> > > >        at
> > > >
> > >
> >
> org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:205)
> > > >        at
> > > >
> > > >
> > >
> >
> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122)
> > > >        at
> > > >
> > >
> >
> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
> > > >        at
> > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
> > > >        at
> > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
> > > >        at java.lang.Thread.run(Thread.java:619)
> > > >
> > > > Application is working correctly, the only thing is this exception in
> > > logs.
> > > > For testing purposes I'm running both client and Mule ESB under same
> > JVM.
> > > >
> > > > Jarek
> > > >
> > > >
> > > > W dniu 26 marca 2009 14:26 użytkownik Gary Tully <
> gary.tully@gmail.com
> > > > >napisał:
> > > >
> > > > > Hi Jarek,
> > > > > some similar behaviour is expressed by issue
> > > > > AMQ-2149<https://issues.apache.org/activemq/browse/AMQ-2149>.
> > > > >
> > > > > One aspect of the problem is related to the persistent index used
> by
> > > the
> > > > > Kaha store. You could try using a VM (in memory) index to see if it
> > > helps
> > > > > in
> > > > > your case.
> > > > >
> > > > > To configure set the setPersistentIndex attribute to false as
> > follows:
> > > > >
> > > > >        broker = ....
> > > > >        AMQPersistenceAdapterFactory persistenceFactory = new
> > > > > AMQPersistenceAdapterFactory();
> > > > >        persistenceFactory.setDataDirectory(dataDirFile);
> > > > >        persistenceFactory.setPersistentIndex(false);
> > > > >        broker.setPersistenceFactory(persistenceFactory);
> > > > >
> > > > > hope this helps,
> > > > > Gary.
> > > > >
> > > > > 2009/3/26 Jarosław Pałka <jp...@gmail.com>
> > > > >
> > > > > > Sorry I forgot to copy embedded broker configuration:
> > > > > >
> > > > > > public abstract class UMMClientConfiguration extends
> > > > > ConfigurationSupport{
> > > > > >
> > > > > >    private static final String OUTGOING_LOCAL_TOPIC = "OUTGOING";
> > > > > >    private static final String INCOMING_LOCAL_TOPIC = "INCOMING";
> > > > > >
> > > > > >    @Bean
> > > > > >    public BrokerService brokerService() throws Exception {
> > > > > >        XBeanBrokerService broker = new XBeanBrokerService();
> > > > > >        broker.setUseJmx(false);
> > > > > >        broker.setPersistent(true);
> > > > > >        //broker.
> > > > > >        broker.addJmsConnector(incomingTopicConnector());
> > > > > >        broker.addJmsConnector(outgoingTopicConnector());
> > > > > >        broker.addConnector("vm://localhost");
> > > > > >        broker.start();
> > > > > >        return broker;
> > > > > >    }
> > > > > >
> > > > > >     @Bean
> > > > > >    public JmsTopicConnector outgoingTopicConnector()
> > > > > >            throws UnknownHostException, URISyntaxException {
> > > > > >        JmsTopicConnector topicConnector = new
> JmsTopicConnector();
> > > > > >        topicConnector
> > > > > >
> > > > > >
> > .setOutboundTopicConnectionFactory(outboundTopicConnectionFactory());
> > > > > >        topicConnector.setLocalClientId("local");
> > > > > >        topicConnector
> > > > > >                .setOutboundTopicBridges(new OutboundTopicBridge[]
> {
> > > > > > outboundTopicBridge() });
> > > > > >        return topicConnector;
> > > > > >    }
> > > > > >
> > > > > >    @Bean
> > > > > >    public TopicConnectionFactory outboundTopicConnectionFactory()
> > > > > >            throws UnknownHostException, URISyntaxException {
> > > > > >        ActiveMQConnectionFactory activeMQConnectionFactory = new
> > > > > > ActiveMQConnectionFactory(
> > > > > >                getAMQBrokerURL());
> > > > > >        activeMQConnectionFactory.setClientID(getClientID());
> > > > > >        return activeMQConnectionFactory;
> > > > > >    }
> > > > > >
> > > > > >    @Bean
> > > > > >    public OutboundTopicBridge outboundTopicBridge() throws
> > > > > > URISyntaxException {
> > > > > >        OutboundTopicBridge topicBridge = new
> OutboundTopicBridge();
> > > > > >        topicBridge.setLocalTopicName(OUTGOING_LOCAL_TOPIC);
> > > > > >        topicBridge.setOutboundTopicName(getOutboundTopicName());
> > > > > >        topicBridge.setConsumerName(getConsumerName());
> > > > > >        return topicBridge;
> > > > > >    }
> > > > > >
> > > > > >    @Bean
> > > > > >    public JmsTemplate jmsTemplate() throws UnknownHostException,
> > > > > >            URISyntaxException {
> > > > > >        JmsTemplate jmsTemplate = new JmsTemplate(
> > > > > >                localOutgoingTopicConnectionFactory());
> > > > > >
>  jmsTemplate.setDefaultDestinationName(OUTGOING_LOCAL_TOPIC);
> > > > > >        jmsTemplate.setDeliveryPersistent(true);
> > > > > >        jmsTemplate.setPubSubDomain(true);
> > > > > >        return jmsTemplate;
> > > > > >    }
> > > > > >
> > > > > >      @Bean(dependsOn = "brokerService")
> > > > > >    public TopicConnectionFactory
> > > localOutgoingTopicConnectionFactory()
> > > > > >            throws UnknownHostException, URISyntaxException {
> > > > > >        ActiveMQConnectionFactory activeMQConnectionFactory = new
> > > > > > ActiveMQConnectionFactory(
> > > > > >                "vm://localhost");
> > > > > >
> > > >  activeMQConnectionFactory.setClientID(getLocalOutgoingClientID());
> > > > > >        return activeMQConnectionFactory;
> > > > > >    }
> > > > > > }
> > > > > >
> > > > > > Regards,
> > > > > > Jarek
> > > > > >
> > > > > > W dniu 26 marca 2009 10:01 użytkownik Jarosław Pałka <
> > > jpalka@gmail.com
> > > > > > >napisał:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > I have strange behavior of embedded ActiveMQ 5.2 broker.
> > > > > > >
> > > > > > > I have an web application running under Jetty 1.6.14 (with
> Spring
> > > > > 2.5.6).
> > > > > > I
> > > > > > > have embedded broker that is connected to ActiveMQ server
> through
> > > JMS
> > > > > > > connector.I have to topics INCOMING and OUTGOING that retrieve
> > and
> > > > > > forward
> > > > > > > messages to ActiveMQ server.
> > > > > > > I use XBeanBrokerService to configure embedded broker. The
> > > > > configuration
> > > > > > of
> > > > > > > embedded broker is in attachment.
> > > > > > >
> > > > > > > Everything runs great until restart of Jetty, once restarted
> web
> > > > > > > application is able to receive messages but it doesn't send
> > > messages.
> > > > I
> > > > > > got
> > > > > > > following exception logged in ActiveMQ server:
> > > > > > >
> > > > > > > ERROR RecoveryListenerAdapter        - Message id
> > > > > > > ID:ubot2-48486-1237589773318-0:0:2291:1:1 could not be
> recovered
> > > from
> > > > > the
> > > > > > > data store - already dispatched
> > > > > > > ERROR Service                        - Async error occurred:
> > > > > > > javax.jms.JMSException: Could not correlate acknowledgment with
> > > > > > dispatched
> > > > > > > message: MessageAck {commandId = 8, responseRequired = false,
> > > ackType
> > > > =
> > > > > > 0,
> > > > > > > consumerId = ID:ubot2-45498-1238055651187-2:0:2:1,
> firstMessageId
> > =
> > > > > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
> > > > > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination =
> > > > topic://IN.ltbl,
> > > > > > > transactionId = null, messageCount = 0}
> > > > > > > javax.jms.JMSException: Could not correlate acknowledgment with
> > > > > > dispatched
> > > > > > > message: MessageAck {commandId = 8, responseRequired = false,
> > > ackType
> > > > =
> > > > > > 0,
> > > > > > > consumerId = ID:ubot2-45498-1238055651187-2:0:2:1,
> firstMessageId
> > =
> > > > > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
> > > > > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination =
> > > > topic://IN.ltbl,
> > > > > > > transactionId = null, messageCount = 0}
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:304)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:373)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
> > > > > > >         at
> > > > > > >
> > > > >
> > >
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> > > > > > >         at
> > > > > > >
> > > > >
> > >
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
> > > > > > >         at
> > > > > > >
> org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
> > > > > > >         at
> > > > > > >
> > > > >
> > >
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
> > > > > > >         at java.lang.Thread.run(Thread.java:619)
> > > > > > >
> > > > > > > and similar message at web application side during shutdown:
> > > > > > >
> > > > > > > 2009-03-26 09:23:10.975::INFO:  Shutdown hook executing
> > > > > > > 2009-03-26 09:23:10.975::INFO:  Graceful shutdown
> > > > > > > SelectChannelConnector@0.0.0.0:8981
> > > > > > > 140421 [ActiveMQ ShutdownHook] INFO
> > > > > > > org.apache.activemq.broker.BrokerService  - ActiveMQ Message
> > Broker
> > > > > > > (localhost, ID:ubot2-45498-1238055651187-0:0) is shutting down
> > > > > > > 140421 [ActiveMQ ShutdownHook] DEBUG
> > > > > > > org.apache.activemq.broker.BrokerService  - Caught exception,
> > must
> > > be
> > > > > > > shutting down: java.lang.IllegalStateException: Shutdown in
> > > progress
> > > > > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > > > > org.mortbay.jetty.webapp.WebAppContext@173831b
> > > > > > > {/test-jndi,/home/jpalka/jetty-6.1.14/contexts/test-jndi.d}
> > > > > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > > > > org.mortbay.jetty.webapp.WebAppContext@1abab88
> > > > > > > {/,/home/jpalka/jetty-6.1.14/webapps/test}
> > > > > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > > > > org.mortbay.jetty.handler.ContextHandler@b1b4c3
> > > > > > > {/javadoc,file:/home/jpalka/jetty-6.1.14/javadoc/}
> > > > > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > > > > org.mortbay.jetty.webapp.WebAppContext@5e5f92
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> {/umm-gateway,jar:file:/home/jpalka/jetty-6.1.14/webapps/umm-gateway.war!/}
> > > > > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > > > > org.mortbay.jetty.webapp.WebAppContext@1d4ab0e
> > > > > > >
> > > >
> {/lottoblog,jar:file:/home/jpalka/jetty-6.1.14/webapps/lottoblog.war!/}
> > > > > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > > > > org.mortbay.jetty.webapp.WebAppContext@12a585c
> > > > > > > {/test-jaas,file:/home/jpalka/jetty-6.1.14/webapps/test-jaas/}
> > > > > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > > > > org.mortbay.jetty.webapp.WebAppContext@10f3801
> > > > > > >
> > > > >
> > >
> {/umm-portal,jar:file:/home/jpalka/jetty-6.1.14/webapps/umm-portal.war!/}
> > > > > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > > > > org.mortbay.jetty.webapp.WebAppContext@2606b8
> > > > > > >
> {/cometd,jar:file:/home/jpalka/jetty-6.1.14/webapps/cometd.war!/}
> > > > > > > 140444 [ActiveMQ ShutdownHook] INFO
> > > > > > > org.apache.activemq.network.jms.JmsConnector  - JMS Connector
> > > > > Connector:0
> > > > > > > Stopped
> > > > > > > 140447 [ActiveMQ ShutdownHook] INFO
> > > > > > > org.apache.activemq.network.jms.JmsConnector  - JMS Connector
> > > > > Connector:1
> > > > > > > Stopped
> > > > > > > 140449 [ActiveMQ Transport Stopper: vm://localhost#0] DEBUG
> > > > > > > org.apache.activemq.broker.TransportConnection  - Stopping
> > > > connection:
> > > > > > > vm://localhost#0
> > > > > > > 140452 [VMTransport] DEBUG
> > > > > org.apache.activemq.store.amq.AMQMessageStore
> > > > > >  -
> > > > > > > flush starting ...
> > > > > > > 140450 [ActiveMQ Connection Worker: tcp://localhost:61616]
> DEBUG
> > > > > > > org.apache.activemq.ActiveMQConnection  - Async exception with
> no
> > > > > > exception
> > > > > > > listener: javax.jms.JMSException: Could not correlate
> > > acknowledgment
> > > > > with
> > > > > > > dispatched message: MessageAck {commandId = 8, responseRequired
> =
> > > > > false,
> > > > > > > ackType = 0, consumerId = ID:ubot2-45498-1238055651187-2:0:2:1,
> > > > > > > firstMessageId = ID:ubot2-52020-1238055648089-0:0:2:1:1,
> > > > lastMessageId
> > > > > =
> > > > > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination =
> > > > topic://IN.ltbl,
> > > > > > > transactionId = null, messageCount = 0}
> > > > > > > javax.jms.JMSException: Could not correlate acknowledgment with
> > > > > > dispatched
> > > > > > > message: MessageAck {commandId = 8, responseRequired = false,
> > > ackType
> > > > =
> > > > > > 0,
> > > > > > > consumerId = ID:ubot2-45498-1238055651187-2:0:2:1,
> firstMessageId
> > =
> > > > > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
> > > > > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination =
> > > > topic://IN.ltbl,
> > > > > > > transactionId = null, messageCount = 0}
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:304)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:373)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
> > > > > > >         at
> > > > > > >
> > > > >
> > >
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> > > > > > >         at
> > > > > > >
> > > > >
> > >
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
> > > > > > >         at
> > > > > > >
> org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
> > > > > > >         at
> > > > > > >
> > > > >
> > >
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
> > > > > > >         at java.lang.Thread.run(Thread.java:619)
> > > > > > > 140465 [VMTransport] DEBUG
> org.apache.activemq.ActiveMQConnection
> > >  -
> > > > > > Async
> > > > > > > exception with no exception listener:
> > > > > > > org.apache.activemq.transport.TransportDisposedIOException:
> Peer
> > > > > > > (vm://localhost#1) disposed.
> > > > > > > org.apache.activemq.transport.TransportDisposedIOException:
> Peer
> > > > > > > (vm://localhost#1) disposed.
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:203)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
> > > > > > >         at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
> > > > > > >         at java.lang.Thread.run(Thread.java:619)
> > > > > > >
> > > > > > >
> > > > > > > The only workaround I found is to remove activemq-data
> directory
> > at
> > > > web
> > > > > > > application site before starting Jetty again.
> > > > > > >
> > > > > > > Can you tell me what I'm doing wrong? I have not seen such
> > behavior
> > > > > with
> > > > > > > ActiveMQ 5.1.
> > > > > > >
> > > > > > > Regards,
> > > > > > > Jarek
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > http://blog.garytully.com
> > > > >
> > > > > Open Source SOA
> > > > > http://FUSESource.com
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > http://blog.garytully.com
> > >
> > > Open Source SOA
> > > http://FUSESource.com
> > >
> >
>
>
>
> --
> http://blog.garytully.com
>
> Open Source SOA
> http://FUSESource.com
>

Re: Embedded broker hangs after restart

Posted by Gary Tully <ga...@gmail.com>.
Hi Jarek,
great that it reproduced easily. That test case should be fine.
I have created a jira issue to track this:
https://issues.apache.org/activemq/browse/AMQ-2185

Could you attach your test case to the issue (file attach) and grant the ASF
license to your test. (tick the check box)

thanks,
Gary.

2009/3/26 Jarosław Pałka <jp...@gmail.com>

> Gary,
>
> It was not so hard to reproduce this exception as it looked to me in the
> beginning. Maybe it is not the best test case I have every written in my
> life, but at least I get the same error message.
>
> package com.assembla.client.impl.tests;
>
> import java.io.File;
> import java.net.URISyntaxException;
> import java.net.UnknownHostException;
>
> import javax.jms.DeliveryMode;
> import javax.jms.Message;
> import javax.jms.MessageListener;
> import javax.jms.Session;
> import javax.jms.TextMessage;
> import javax.jms.TopicConnection;
> import javax.jms.TopicConnectionFactory;
> import javax.jms.TopicPublisher;
> import javax.jms.TopicSession;
> import javax.jms.TopicSubscriber;
>
> import junit.framework.TestCase;
>
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.broker.BrokerService;
> import org.apache.activemq.command.ActiveMQTopic;
> import org.apache.activemq.network.jms.JmsTopicConnector;
> import org.apache.activemq.network.jms.OutboundTopicBridge;
> import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
>
> public class ActiveMQJMSConnectorTest extends TestCase {
>     private static final String OUTGOING_LOCAL_TOPIC = "OUTGOING";
>     private BrokerService serverBroker;
>    private BrokerService clientBroker;
>
>    @Override
>    protected void setUp() throws Exception {
>
>        serverBroker = new BrokerService();
>        serverBroker.setUseJmx(false);
>        serverBroker.setPersistent(true);
>        serverBroker.addConnector("tcp://localhost:61616");
>        serverBroker.setDataDirectory("server-broker");
>
>        serverBroker.start();
>
>        clientBroker = new BrokerService();
>        clientBroker.setUseJmx(false);
>        clientBroker.setPersistent(true);
>
>        AMQPersistenceAdapterFactory persistenceFactory = new
> AMQPersistenceAdapterFactory();
>        persistenceFactory.setDataDirectory(new File("client-broker"));
>        persistenceFactory.setPersistentIndex(false);
>
>        clientBroker.setPersistenceFactory(persistenceFactory);
>        clientBroker.addJmsConnector(outgoingTopicConnector());
>        clientBroker.addConnector("vm://localhost");
>        clientBroker.start();
>
>    }
>
>    private JmsTopicConnector outgoingTopicConnector()
>             throws UnknownHostException, URISyntaxException {
>        JmsTopicConnector topicConnector = new JmsTopicConnector();
>        topicConnector
>
> .setOutboundTopicConnectionFactory(outboundTopicConnectionFactory());
>        topicConnector.setLocalClientId("local");
>        topicConnector
>                .setOutboundTopicBridges(new OutboundTopicBridge[] {
> outboundTopicBridge() });
>        return topicConnector;
>    }
>
>     private TopicConnectionFactory outboundTopicConnectionFactory()
>             throws UnknownHostException, URISyntaxException {
>        ActiveMQConnectionFactory activeMQConnectionFactory = new
> ActiveMQConnectionFactory(
>                getAMQBrokerURL());
>        activeMQConnectionFactory.setClientID(getClientID());
>        return activeMQConnectionFactory;
>    }
>
>     private OutboundTopicBridge outboundTopicBridge() throws
> URISyntaxException {
>        OutboundTopicBridge topicBridge = new OutboundTopicBridge();
>        topicBridge.setLocalTopicName(OUTGOING_LOCAL_TOPIC);
>        topicBridge.setOutboundTopicName(getOutboundTopicName());
>        topicBridge.setConsumerName(getConsumerName());
>        return topicBridge;
>    }
>
>     private String getClientID() {
>        return "embedded-client";
>    }
>
>    private String getAMQBrokerURL() {
>        return "tcp://localhost:61616";
>    }
>
>    private String getAppID() {
>        return "subtopic";
>    }
>
>    private String getOutboundTopicName() throws URISyntaxException {
>        String topicName = "OUT." + getAppID();
>        return topicName;
>    }
>
>    private String getConsumerName() throws URISyntaxException {
>        String consumerName = getAppID();
>        return consumerName;
>    }
>
>    public void testMessage() throws Exception {
>
>        ActiveMQConnectionFactory serverFcty = new
> ActiveMQConnectionFactory(
>                "tcp://localhost:61616");
>        TopicConnection serverCnn = serverFcty.createTopicConnection();
>        serverCnn.setClientID("serverCnn");
>        serverCnn.start();
>
>        TopicSession serverSession = serverCnn.createTopicSession(false,
> Session.AUTO_ACKNOWLEDGE);
>        TopicSubscriber serverSubscriber =
> serverSession.createDurableSubscriber(new
> ActiveMQTopic(getOutboundTopicName()), "serverCnn");
>        serverSubscriber.setMessageListener(new MessageListener(){
>
>            @Override
>            public void onMessage(Message arg0) {
>                System.out.println(arg0);
>            }});
>
>        ActiveMQConnectionFactory clientFcty = new
> ActiveMQConnectionFactory("vm://localhost");
>        TopicConnection clientCnn = clientFcty.createTopicConnection();
>        clientCnn.start();
>
>        TopicSession clientSession = clientCnn.createTopicSession(false,
> Session.AUTO_ACKNOWLEDGE);
>
>        TextMessage msg = clientSession.createTextMessage("<message/>");
>        msg.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
>        TopicPublisher publisher = clientSession.createPublisher(new
> ActiveMQTopic(OUTGOING_LOCAL_TOPIC));
>        publisher.send(msg);
>
>        Thread.sleep(5000);
>    }
>
>    @Override
>    protected void tearDown() throws Exception {
>        clientBroker.stop();
>
>        serverBroker.start();
>    }
>
> }
>
> Regards,
> Jarek
>
> W dniu 26 marca 2009 18:38 użytkownik Gary Tully <gary.tully@gmail.com
> >napisał:
>
> > Jarek,
> > good news, but that exception is ugly. It would be smashing if you could
> > reproduce in a Junit tests case using just the JMS apis, but it may be
> > difficult to reproduce.
> >
> > I think I need to turn the queue test case from AMQ-2149 into a Topic
> > scenario. That may reproduce.
> >
> > 2009/3/26 Jarosław Pałka <jp...@gmail.com>
> >
> > > Gary,
> > >
> > > I did recommended changes. On the embedded broker side everything looks
> > > fine
> > > now. I restarted Jetty (embedded broker) many times and it worked.
> > >
> > > What is interesting I started to see some exceptions in my other
> > > application
> > > that is listening on the same ActiveMQ server. I think below schema
> will
> > > help to explain my topology,
> > >
> > > client ---> topic://OUTGOING (embedded broker with JMS connector) -->
> > > topic://OUT.subtopic (ActiveMQ server) --> Mule ESB
> > >
> > > When message is sent from client to MuleESB, after implementing your
> > > changes
> > > I started to see follwoing message:
> > >
> > > 19761 [ActiveMQ Session Task] DEBUG
> > > org.mule.transport.jms.MultiConsumerJmsMessageReceiver  - Message
> > Received
> > > from: jms://topic:OUT.*
> > > 19761 [ActiveMQ Session Task] DEBUG org.mule.DefaultMuleSession  -
> There
> > is
> > > no session id on the request using key: ID. Generating new session id:
> > > 575fc82b-1a0f-11de-8ce7-c7a955038e40
> > > 19762 [ActiveMQ Session Task] DEBUG org.mule.DefaultMuleSession  -
> > > dispatching event to service: service.que-outgoing, event is:
> MuleEvent:
> > > 575fc82c-1a0f-11de-8ce7-c7a955038e40, sync=false, stop processing=f
> > > alse, DefaultInboundEndpoint{endpointUri=jms://topic:OUT.*,
> > > connector=ActiveMQJmsConnector{this=ad97f5, started=true,
> > initialised=true,
> > > name='jmsConnector', disposed=false, numberOfConcurrentTransactedReceiv
> > > ers=4, createMultipleTransactedReceivers=true, connected=true,
> > > supportedProtocols=[jms], serviceOverrides=null},
> > > transformer=[JMSMessageToObject{this=170b819,
> name='JMSMessageToObject',
> > > ignoreBadInput=false,
> > >  returnClass=class java.lang.Object, sourceTypes=[interface
> > > javax.jms.Message, interface javax.jms.TextMessage, interface
> > > javax.jms.ObjectMessage, interface javax.jms.BytesMessage, interface
> > > javax.jms.MapMes
> > > sage, interface javax.jms.StreamMessage]},
> > XML2UMMTransformer{this=146ad8b,
> > > name='xml2umm', ignoreBadInput=false, returnClass=class
> java.lang.Object,
> > > sourceTypes=[]}, ExtractHeadersTransformer{this=12b9f14,
> > > name='extract-headers', ignoreBadInput=false, returnClass=class
> > > java.lang.Object, sourceTypes=[]}], name='endpoint.jms.OUT',
> > > properties={durableName=cozaciota},
> > > transactionConfig=Transaction{factory=null, ac
> > > tion=NEVER, timeout=0}, filter=null, deleteUnacceptedMessages=false,
> > > securityFilter=null, synchronous=false, initialState=started,
> > > remoteSync=false, remoteSyncTimeout=10000, endpointEncoding=UTF-8}
> > > 10459 [ActiveMQ Connection Worker: vm://localhost#2] DEBUG
> > > org.apache.activemq.ActiveMQConnection  - Async exception with no
> > exception
> > > listener: java.lang.NullPointerException
> > > java.lang.NullPointerException
> > >        at
> > >
> > >
> >
> org.apache.activemq.kaha.impl.index.VMIndexLinkedList.remove(VMIndexLinkedList.java:265)
> > >        at
> > >
> > >
> >
> org.apache.activemq.kaha.impl.container.ListContainerImpl.remove(ListContainerImpl.java:314)
> > >        at
> > >
> > >
> >
> org.apache.activemq.kaha.impl.container.ListContainerImpl.remove(ListContainerImpl.java:695)
> > >        at
> > >
> > >
> >
> org.apache.activemq.store.kahadaptor.KahaTopicReferenceStore.acknowledgeReference(KahaTopicReferenceStore.java:148)
> > >        at
> > >
> > >
> >
> org.apache.activemq.store.amq.AMQTopicMessageStore.acknowledge(AMQTopicMessageStore.java:163)
> > >        at
> > >
> > >
> >
> org.apache.activemq.store.amq.AMQTopicMessageStore.acknowledge(AMQTopicMessageStore.java:91)
> > >        at
> > > org.apache.activemq.broker.region.Topic.acknowledge(Topic.java:447)
> > >        at
> > >
> > >
> >
> org.apache.activemq.broker.region.DurableTopicSubscription.acknowledge(DurableTopicSubscription.java:223)
> > >        at
> > >
> > >
> >
> org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:238)
> > >         at
> > >
> > >
> >
> org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:373)
> > >        at
> > >
> > >
> >
> org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462)
> > >        at
> > >
> > >
> >
> org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
> > >        at
> > >
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> > >        at
> > >
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> > >        at
> > >
> > >
> >
> org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
> > >        at
> > >
> > >
> >
> org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
> > >        at
> > org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
> > >        at
> > >
> > >
> >
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
> > >        at
> > >
> > >
> >
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
> > >        at
> > >
> > >
> >
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:104)
> > >         at
> > >
> > >
> >
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
> > >        at
> > >
> >
> org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:205)
> > >        at
> > >
> > >
> >
> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122)
> > >        at
> > >
> >
> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
> > >        at
> > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
> > >        at
> > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
> > >        at java.lang.Thread.run(Thread.java:619)
> > >
> > > Application is working correctly, the only thing is this exception in
> > logs.
> > > For testing purposes I'm running both client and Mule ESB under same
> JVM.
> > >
> > > Jarek
> > >
> > >
> > > W dniu 26 marca 2009 14:26 użytkownik Gary Tully <gary.tully@gmail.com
> > > >napisał:
> > >
> > > > Hi Jarek,
> > > > some similar behaviour is expressed by issue
> > > > AMQ-2149<https://issues.apache.org/activemq/browse/AMQ-2149>.
> > > >
> > > > One aspect of the problem is related to the persistent index used by
> > the
> > > > Kaha store. You could try using a VM (in memory) index to see if it
> > helps
> > > > in
> > > > your case.
> > > >
> > > > To configure set the setPersistentIndex attribute to false as
> follows:
> > > >
> > > >        broker = ....
> > > >        AMQPersistenceAdapterFactory persistenceFactory = new
> > > > AMQPersistenceAdapterFactory();
> > > >        persistenceFactory.setDataDirectory(dataDirFile);
> > > >        persistenceFactory.setPersistentIndex(false);
> > > >        broker.setPersistenceFactory(persistenceFactory);
> > > >
> > > > hope this helps,
> > > > Gary.
> > > >
> > > > 2009/3/26 Jarosław Pałka <jp...@gmail.com>
> > > >
> > > > > Sorry I forgot to copy embedded broker configuration:
> > > > >
> > > > > public abstract class UMMClientConfiguration extends
> > > > ConfigurationSupport{
> > > > >
> > > > >    private static final String OUTGOING_LOCAL_TOPIC = "OUTGOING";
> > > > >    private static final String INCOMING_LOCAL_TOPIC = "INCOMING";
> > > > >
> > > > >    @Bean
> > > > >    public BrokerService brokerService() throws Exception {
> > > > >        XBeanBrokerService broker = new XBeanBrokerService();
> > > > >        broker.setUseJmx(false);
> > > > >        broker.setPersistent(true);
> > > > >        //broker.
> > > > >        broker.addJmsConnector(incomingTopicConnector());
> > > > >        broker.addJmsConnector(outgoingTopicConnector());
> > > > >        broker.addConnector("vm://localhost");
> > > > >        broker.start();
> > > > >        return broker;
> > > > >    }
> > > > >
> > > > >     @Bean
> > > > >    public JmsTopicConnector outgoingTopicConnector()
> > > > >            throws UnknownHostException, URISyntaxException {
> > > > >        JmsTopicConnector topicConnector = new JmsTopicConnector();
> > > > >        topicConnector
> > > > >
> > > > >
> .setOutboundTopicConnectionFactory(outboundTopicConnectionFactory());
> > > > >        topicConnector.setLocalClientId("local");
> > > > >        topicConnector
> > > > >                .setOutboundTopicBridges(new OutboundTopicBridge[] {
> > > > > outboundTopicBridge() });
> > > > >        return topicConnector;
> > > > >    }
> > > > >
> > > > >    @Bean
> > > > >    public TopicConnectionFactory outboundTopicConnectionFactory()
> > > > >            throws UnknownHostException, URISyntaxException {
> > > > >        ActiveMQConnectionFactory activeMQConnectionFactory = new
> > > > > ActiveMQConnectionFactory(
> > > > >                getAMQBrokerURL());
> > > > >        activeMQConnectionFactory.setClientID(getClientID());
> > > > >        return activeMQConnectionFactory;
> > > > >    }
> > > > >
> > > > >    @Bean
> > > > >    public OutboundTopicBridge outboundTopicBridge() throws
> > > > > URISyntaxException {
> > > > >        OutboundTopicBridge topicBridge = new OutboundTopicBridge();
> > > > >        topicBridge.setLocalTopicName(OUTGOING_LOCAL_TOPIC);
> > > > >        topicBridge.setOutboundTopicName(getOutboundTopicName());
> > > > >        topicBridge.setConsumerName(getConsumerName());
> > > > >        return topicBridge;
> > > > >    }
> > > > >
> > > > >    @Bean
> > > > >    public JmsTemplate jmsTemplate() throws UnknownHostException,
> > > > >            URISyntaxException {
> > > > >        JmsTemplate jmsTemplate = new JmsTemplate(
> > > > >                localOutgoingTopicConnectionFactory());
> > > > >        jmsTemplate.setDefaultDestinationName(OUTGOING_LOCAL_TOPIC);
> > > > >        jmsTemplate.setDeliveryPersistent(true);
> > > > >        jmsTemplate.setPubSubDomain(true);
> > > > >        return jmsTemplate;
> > > > >    }
> > > > >
> > > > >      @Bean(dependsOn = "brokerService")
> > > > >    public TopicConnectionFactory
> > localOutgoingTopicConnectionFactory()
> > > > >            throws UnknownHostException, URISyntaxException {
> > > > >        ActiveMQConnectionFactory activeMQConnectionFactory = new
> > > > > ActiveMQConnectionFactory(
> > > > >                "vm://localhost");
> > > > >
> > >  activeMQConnectionFactory.setClientID(getLocalOutgoingClientID());
> > > > >        return activeMQConnectionFactory;
> > > > >    }
> > > > > }
> > > > >
> > > > > Regards,
> > > > > Jarek
> > > > >
> > > > > W dniu 26 marca 2009 10:01 użytkownik Jarosław Pałka <
> > jpalka@gmail.com
> > > > > >napisał:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I have strange behavior of embedded ActiveMQ 5.2 broker.
> > > > > >
> > > > > > I have an web application running under Jetty 1.6.14 (with Spring
> > > > 2.5.6).
> > > > > I
> > > > > > have embedded broker that is connected to ActiveMQ server through
> > JMS
> > > > > > connector.I have to topics INCOMING and OUTGOING that retrieve
> and
> > > > > forward
> > > > > > messages to ActiveMQ server.
> > > > > > I use XBeanBrokerService to configure embedded broker. The
> > > > configuration
> > > > > of
> > > > > > embedded broker is in attachment.
> > > > > >
> > > > > > Everything runs great until restart of Jetty, once restarted web
> > > > > > application is able to receive messages but it doesn't send
> > messages.
> > > I
> > > > > got
> > > > > > following exception logged in ActiveMQ server:
> > > > > >
> > > > > > ERROR RecoveryListenerAdapter        - Message id
> > > > > > ID:ubot2-48486-1237589773318-0:0:2291:1:1 could not be recovered
> > from
> > > > the
> > > > > > data store - already dispatched
> > > > > > ERROR Service                        - Async error occurred:
> > > > > > javax.jms.JMSException: Could not correlate acknowledgment with
> > > > > dispatched
> > > > > > message: MessageAck {commandId = 8, responseRequired = false,
> > ackType
> > > =
> > > > > 0,
> > > > > > consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, firstMessageId
> =
> > > > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
> > > > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination =
> > > topic://IN.ltbl,
> > > > > > transactionId = null, messageCount = 0}
> > > > > > javax.jms.JMSException: Could not correlate acknowledgment with
> > > > > dispatched
> > > > > > message: MessageAck {commandId = 8, responseRequired = false,
> > ackType
> > > =
> > > > > 0,
> > > > > > consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, firstMessageId
> =
> > > > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
> > > > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination =
> > > topic://IN.ltbl,
> > > > > > transactionId = null, messageCount = 0}
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:304)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:373)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
> > > > > >         at
> > > > > >
> > > >
> > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> > > > > >         at
> > > > > >
> > > >
> > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
> > > > > >         at
> > > > > > org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
> > > > > >         at
> > > > > >
> > > >
> > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
> > > > > >         at java.lang.Thread.run(Thread.java:619)
> > > > > >
> > > > > > and similar message at web application side during shutdown:
> > > > > >
> > > > > > 2009-03-26 09:23:10.975::INFO:  Shutdown hook executing
> > > > > > 2009-03-26 09:23:10.975::INFO:  Graceful shutdown
> > > > > > SelectChannelConnector@0.0.0.0:8981
> > > > > > 140421 [ActiveMQ ShutdownHook] INFO
> > > > > > org.apache.activemq.broker.BrokerService  - ActiveMQ Message
> Broker
> > > > > > (localhost, ID:ubot2-45498-1238055651187-0:0) is shutting down
> > > > > > 140421 [ActiveMQ ShutdownHook] DEBUG
> > > > > > org.apache.activemq.broker.BrokerService  - Caught exception,
> must
> > be
> > > > > > shutting down: java.lang.IllegalStateException: Shutdown in
> > progress
> > > > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > > > org.mortbay.jetty.webapp.WebAppContext@173831b
> > > > > > {/test-jndi,/home/jpalka/jetty-6.1.14/contexts/test-jndi.d}
> > > > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > > > org.mortbay.jetty.webapp.WebAppContext@1abab88
> > > > > > {/,/home/jpalka/jetty-6.1.14/webapps/test}
> > > > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > > > org.mortbay.jetty.handler.ContextHandler@b1b4c3
> > > > > > {/javadoc,file:/home/jpalka/jetty-6.1.14/javadoc/}
> > > > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > > > org.mortbay.jetty.webapp.WebAppContext@5e5f92
> > > > > >
> > > > >
> > > >
> > >
> >
> {/umm-gateway,jar:file:/home/jpalka/jetty-6.1.14/webapps/umm-gateway.war!/}
> > > > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > > > org.mortbay.jetty.webapp.WebAppContext@1d4ab0e
> > > > > >
> > > {/lottoblog,jar:file:/home/jpalka/jetty-6.1.14/webapps/lottoblog.war!/}
> > > > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > > > org.mortbay.jetty.webapp.WebAppContext@12a585c
> > > > > > {/test-jaas,file:/home/jpalka/jetty-6.1.14/webapps/test-jaas/}
> > > > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > > > org.mortbay.jetty.webapp.WebAppContext@10f3801
> > > > > >
> > > >
> > {/umm-portal,jar:file:/home/jpalka/jetty-6.1.14/webapps/umm-portal.war!/}
> > > > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > > > org.mortbay.jetty.webapp.WebAppContext@2606b8
> > > > > > {/cometd,jar:file:/home/jpalka/jetty-6.1.14/webapps/cometd.war!/}
> > > > > > 140444 [ActiveMQ ShutdownHook] INFO
> > > > > > org.apache.activemq.network.jms.JmsConnector  - JMS Connector
> > > > Connector:0
> > > > > > Stopped
> > > > > > 140447 [ActiveMQ ShutdownHook] INFO
> > > > > > org.apache.activemq.network.jms.JmsConnector  - JMS Connector
> > > > Connector:1
> > > > > > Stopped
> > > > > > 140449 [ActiveMQ Transport Stopper: vm://localhost#0] DEBUG
> > > > > > org.apache.activemq.broker.TransportConnection  - Stopping
> > > connection:
> > > > > > vm://localhost#0
> > > > > > 140452 [VMTransport] DEBUG
> > > > org.apache.activemq.store.amq.AMQMessageStore
> > > > >  -
> > > > > > flush starting ...
> > > > > > 140450 [ActiveMQ Connection Worker: tcp://localhost:61616] DEBUG
> > > > > > org.apache.activemq.ActiveMQConnection  - Async exception with no
> > > > > exception
> > > > > > listener: javax.jms.JMSException: Could not correlate
> > acknowledgment
> > > > with
> > > > > > dispatched message: MessageAck {commandId = 8, responseRequired =
> > > > false,
> > > > > > ackType = 0, consumerId = ID:ubot2-45498-1238055651187-2:0:2:1,
> > > > > > firstMessageId = ID:ubot2-52020-1238055648089-0:0:2:1:1,
> > > lastMessageId
> > > > =
> > > > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination =
> > > topic://IN.ltbl,
> > > > > > transactionId = null, messageCount = 0}
> > > > > > javax.jms.JMSException: Could not correlate acknowledgment with
> > > > > dispatched
> > > > > > message: MessageAck {commandId = 8, responseRequired = false,
> > ackType
> > > =
> > > > > 0,
> > > > > > consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, firstMessageId
> =
> > > > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
> > > > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination =
> > > topic://IN.ltbl,
> > > > > > transactionId = null, messageCount = 0}
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:304)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:373)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
> > > > > >         at
> > > > > >
> > > >
> > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> > > > > >         at
> > > > > >
> > > >
> > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
> > > > > >         at
> > > > > > org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
> > > > > >         at
> > > > > >
> > > >
> > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
> > > > > >         at java.lang.Thread.run(Thread.java:619)
> > > > > > 140465 [VMTransport] DEBUG org.apache.activemq.ActiveMQConnection
> >  -
> > > > > Async
> > > > > > exception with no exception listener:
> > > > > > org.apache.activemq.transport.TransportDisposedIOException: Peer
> > > > > > (vm://localhost#1) disposed.
> > > > > > org.apache.activemq.transport.TransportDisposedIOException: Peer
> > > > > > (vm://localhost#1) disposed.
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:203)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
> > > > > >         at
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
> > > > > >         at java.lang.Thread.run(Thread.java:619)
> > > > > >
> > > > > >
> > > > > > The only workaround I found is to remove activemq-data directory
> at
> > > web
> > > > > > application site before starting Jetty again.
> > > > > >
> > > > > > Can you tell me what I'm doing wrong? I have not seen such
> behavior
> > > > with
> > > > > > ActiveMQ 5.1.
> > > > > >
> > > > > > Regards,
> > > > > > Jarek
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > http://blog.garytully.com
> > > >
> > > > Open Source SOA
> > > > http://FUSESource.com
> > > >
> > >
> >
> >
> >
> > --
> > http://blog.garytully.com
> >
> > Open Source SOA
> > http://FUSESource.com
> >
>



-- 
http://blog.garytully.com

Open Source SOA
http://FUSESource.com

Re: Embedded broker hangs after restart

Posted by Jarosław Pałka <jp...@gmail.com>.
Gary,

It was not so hard to reproduce this exception as it looked to me in the
beginning. Maybe it is not the best test case I have every written in my
life, but at least I get the same error message.

package com.assembla.client.impl.tests;

import java.io.File;
import java.net.URISyntaxException;
import java.net.UnknownHostException;

import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;

import junit.framework.TestCase;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.jms.JmsTopicConnector;
import org.apache.activemq.network.jms.OutboundTopicBridge;
import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;

public class ActiveMQJMSConnectorTest extends TestCase {
    private static final String OUTGOING_LOCAL_TOPIC = "OUTGOING";
    private BrokerService serverBroker;
    private BrokerService clientBroker;

    @Override
    protected void setUp() throws Exception {

        serverBroker = new BrokerService();
        serverBroker.setUseJmx(false);
        serverBroker.setPersistent(true);
        serverBroker.addConnector("tcp://localhost:61616");
        serverBroker.setDataDirectory("server-broker");

        serverBroker.start();

        clientBroker = new BrokerService();
        clientBroker.setUseJmx(false);
        clientBroker.setPersistent(true);

        AMQPersistenceAdapterFactory persistenceFactory = new
AMQPersistenceAdapterFactory();
        persistenceFactory.setDataDirectory(new File("client-broker"));
        persistenceFactory.setPersistentIndex(false);

        clientBroker.setPersistenceFactory(persistenceFactory);
        clientBroker.addJmsConnector(outgoingTopicConnector());
        clientBroker.addConnector("vm://localhost");
        clientBroker.start();

    }

    private JmsTopicConnector outgoingTopicConnector()
            throws UnknownHostException, URISyntaxException {
        JmsTopicConnector topicConnector = new JmsTopicConnector();
        topicConnector

.setOutboundTopicConnectionFactory(outboundTopicConnectionFactory());
        topicConnector.setLocalClientId("local");
        topicConnector
                .setOutboundTopicBridges(new OutboundTopicBridge[] {
outboundTopicBridge() });
        return topicConnector;
    }

    private TopicConnectionFactory outboundTopicConnectionFactory()
            throws UnknownHostException, URISyntaxException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new
ActiveMQConnectionFactory(
                getAMQBrokerURL());
        activeMQConnectionFactory.setClientID(getClientID());
        return activeMQConnectionFactory;
    }

    private OutboundTopicBridge outboundTopicBridge() throws
URISyntaxException {
        OutboundTopicBridge topicBridge = new OutboundTopicBridge();
        topicBridge.setLocalTopicName(OUTGOING_LOCAL_TOPIC);
        topicBridge.setOutboundTopicName(getOutboundTopicName());
        topicBridge.setConsumerName(getConsumerName());
        return topicBridge;
    }

    private String getClientID() {
        return "embedded-client";
    }

    private String getAMQBrokerURL() {
        return "tcp://localhost:61616";
    }

    private String getAppID() {
        return "subtopic";
    }

    private String getOutboundTopicName() throws URISyntaxException {
        String topicName = "OUT." + getAppID();
        return topicName;
    }

    private String getConsumerName() throws URISyntaxException {
        String consumerName = getAppID();
        return consumerName;
    }

    public void testMessage() throws Exception {

        ActiveMQConnectionFactory serverFcty = new
ActiveMQConnectionFactory(
                "tcp://localhost:61616");
        TopicConnection serverCnn = serverFcty.createTopicConnection();
        serverCnn.setClientID("serverCnn");
        serverCnn.start();

        TopicSession serverSession = serverCnn.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
        TopicSubscriber serverSubscriber =
serverSession.createDurableSubscriber(new
ActiveMQTopic(getOutboundTopicName()), "serverCnn");
        serverSubscriber.setMessageListener(new MessageListener(){

            @Override
            public void onMessage(Message arg0) {
                System.out.println(arg0);
            }});

        ActiveMQConnectionFactory clientFcty = new
ActiveMQConnectionFactory("vm://localhost");
        TopicConnection clientCnn = clientFcty.createTopicConnection();
        clientCnn.start();

        TopicSession clientSession = clientCnn.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);

        TextMessage msg = clientSession.createTextMessage("<message/>");
        msg.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
        TopicPublisher publisher = clientSession.createPublisher(new
ActiveMQTopic(OUTGOING_LOCAL_TOPIC));
        publisher.send(msg);

        Thread.sleep(5000);
    }

    @Override
    protected void tearDown() throws Exception {
        clientBroker.stop();

        serverBroker.start();
    }

}

Regards,
Jarek

W dniu 26 marca 2009 18:38 użytkownik Gary Tully <ga...@gmail.com>napisał:

> Jarek,
> good news, but that exception is ugly. It would be smashing if you could
> reproduce in a Junit tests case using just the JMS apis, but it may be
> difficult to reproduce.
>
> I think I need to turn the queue test case from AMQ-2149 into a Topic
> scenario. That may reproduce.
>
> 2009/3/26 Jarosław Pałka <jp...@gmail.com>
>
> > Gary,
> >
> > I did recommended changes. On the embedded broker side everything looks
> > fine
> > now. I restarted Jetty (embedded broker) many times and it worked.
> >
> > What is interesting I started to see some exceptions in my other
> > application
> > that is listening on the same ActiveMQ server. I think below schema will
> > help to explain my topology,
> >
> > client ---> topic://OUTGOING (embedded broker with JMS connector) -->
> > topic://OUT.subtopic (ActiveMQ server) --> Mule ESB
> >
> > When message is sent from client to MuleESB, after implementing your
> > changes
> > I started to see follwoing message:
> >
> > 19761 [ActiveMQ Session Task] DEBUG
> > org.mule.transport.jms.MultiConsumerJmsMessageReceiver  - Message
> Received
> > from: jms://topic:OUT.*
> > 19761 [ActiveMQ Session Task] DEBUG org.mule.DefaultMuleSession  - There
> is
> > no session id on the request using key: ID. Generating new session id:
> > 575fc82b-1a0f-11de-8ce7-c7a955038e40
> > 19762 [ActiveMQ Session Task] DEBUG org.mule.DefaultMuleSession  -
> > dispatching event to service: service.que-outgoing, event is: MuleEvent:
> > 575fc82c-1a0f-11de-8ce7-c7a955038e40, sync=false, stop processing=f
> > alse, DefaultInboundEndpoint{endpointUri=jms://topic:OUT.*,
> > connector=ActiveMQJmsConnector{this=ad97f5, started=true,
> initialised=true,
> > name='jmsConnector', disposed=false, numberOfConcurrentTransactedReceiv
> > ers=4, createMultipleTransactedReceivers=true, connected=true,
> > supportedProtocols=[jms], serviceOverrides=null},
> > transformer=[JMSMessageToObject{this=170b819, name='JMSMessageToObject',
> > ignoreBadInput=false,
> >  returnClass=class java.lang.Object, sourceTypes=[interface
> > javax.jms.Message, interface javax.jms.TextMessage, interface
> > javax.jms.ObjectMessage, interface javax.jms.BytesMessage, interface
> > javax.jms.MapMes
> > sage, interface javax.jms.StreamMessage]},
> XML2UMMTransformer{this=146ad8b,
> > name='xml2umm', ignoreBadInput=false, returnClass=class java.lang.Object,
> > sourceTypes=[]}, ExtractHeadersTransformer{this=12b9f14,
> > name='extract-headers', ignoreBadInput=false, returnClass=class
> > java.lang.Object, sourceTypes=[]}], name='endpoint.jms.OUT',
> > properties={durableName=cozaciota},
> > transactionConfig=Transaction{factory=null, ac
> > tion=NEVER, timeout=0}, filter=null, deleteUnacceptedMessages=false,
> > securityFilter=null, synchronous=false, initialState=started,
> > remoteSync=false, remoteSyncTimeout=10000, endpointEncoding=UTF-8}
> > 10459 [ActiveMQ Connection Worker: vm://localhost#2] DEBUG
> > org.apache.activemq.ActiveMQConnection  - Async exception with no
> exception
> > listener: java.lang.NullPointerException
> > java.lang.NullPointerException
> >        at
> >
> >
> org.apache.activemq.kaha.impl.index.VMIndexLinkedList.remove(VMIndexLinkedList.java:265)
> >        at
> >
> >
> org.apache.activemq.kaha.impl.container.ListContainerImpl.remove(ListContainerImpl.java:314)
> >        at
> >
> >
> org.apache.activemq.kaha.impl.container.ListContainerImpl.remove(ListContainerImpl.java:695)
> >        at
> >
> >
> org.apache.activemq.store.kahadaptor.KahaTopicReferenceStore.acknowledgeReference(KahaTopicReferenceStore.java:148)
> >        at
> >
> >
> org.apache.activemq.store.amq.AMQTopicMessageStore.acknowledge(AMQTopicMessageStore.java:163)
> >        at
> >
> >
> org.apache.activemq.store.amq.AMQTopicMessageStore.acknowledge(AMQTopicMessageStore.java:91)
> >        at
> > org.apache.activemq.broker.region.Topic.acknowledge(Topic.java:447)
> >        at
> >
> >
> org.apache.activemq.broker.region.DurableTopicSubscription.acknowledge(DurableTopicSubscription.java:223)
> >        at
> >
> >
> org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:238)
> >         at
> >
> >
> org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:373)
> >        at
> >
> >
> org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462)
> >        at
> >
> >
> org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
> >        at
> > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> >        at
> > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> >        at
> >
> >
> org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
> >        at
> >
> >
> org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
> >        at
> org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
> >        at
> >
> >
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
> >        at
> >
> >
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
> >        at
> >
> >
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:104)
> >         at
> >
> >
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
> >        at
> >
> org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:205)
> >        at
> >
> >
> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122)
> >        at
> >
> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
> >        at
> >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
> >        at
> >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
> >        at java.lang.Thread.run(Thread.java:619)
> >
> > Application is working correctly, the only thing is this exception in
> logs.
> > For testing purposes I'm running both client and Mule ESB under same JVM.
> >
> > Jarek
> >
> >
> > W dniu 26 marca 2009 14:26 użytkownik Gary Tully <gary.tully@gmail.com
> > >napisał:
> >
> > > Hi Jarek,
> > > some similar behaviour is expressed by issue
> > > AMQ-2149<https://issues.apache.org/activemq/browse/AMQ-2149>.
> > >
> > > One aspect of the problem is related to the persistent index used by
> the
> > > Kaha store. You could try using a VM (in memory) index to see if it
> helps
> > > in
> > > your case.
> > >
> > > To configure set the setPersistentIndex attribute to false as follows:
> > >
> > >        broker = ....
> > >        AMQPersistenceAdapterFactory persistenceFactory = new
> > > AMQPersistenceAdapterFactory();
> > >        persistenceFactory.setDataDirectory(dataDirFile);
> > >        persistenceFactory.setPersistentIndex(false);
> > >        broker.setPersistenceFactory(persistenceFactory);
> > >
> > > hope this helps,
> > > Gary.
> > >
> > > 2009/3/26 Jarosław Pałka <jp...@gmail.com>
> > >
> > > > Sorry I forgot to copy embedded broker configuration:
> > > >
> > > > public abstract class UMMClientConfiguration extends
> > > ConfigurationSupport{
> > > >
> > > >    private static final String OUTGOING_LOCAL_TOPIC = "OUTGOING";
> > > >    private static final String INCOMING_LOCAL_TOPIC = "INCOMING";
> > > >
> > > >    @Bean
> > > >    public BrokerService brokerService() throws Exception {
> > > >        XBeanBrokerService broker = new XBeanBrokerService();
> > > >        broker.setUseJmx(false);
> > > >        broker.setPersistent(true);
> > > >        //broker.
> > > >        broker.addJmsConnector(incomingTopicConnector());
> > > >        broker.addJmsConnector(outgoingTopicConnector());
> > > >        broker.addConnector("vm://localhost");
> > > >        broker.start();
> > > >        return broker;
> > > >    }
> > > >
> > > >     @Bean
> > > >    public JmsTopicConnector outgoingTopicConnector()
> > > >            throws UnknownHostException, URISyntaxException {
> > > >        JmsTopicConnector topicConnector = new JmsTopicConnector();
> > > >        topicConnector
> > > >
> > > > .setOutboundTopicConnectionFactory(outboundTopicConnectionFactory());
> > > >        topicConnector.setLocalClientId("local");
> > > >        topicConnector
> > > >                .setOutboundTopicBridges(new OutboundTopicBridge[] {
> > > > outboundTopicBridge() });
> > > >        return topicConnector;
> > > >    }
> > > >
> > > >    @Bean
> > > >    public TopicConnectionFactory outboundTopicConnectionFactory()
> > > >            throws UnknownHostException, URISyntaxException {
> > > >        ActiveMQConnectionFactory activeMQConnectionFactory = new
> > > > ActiveMQConnectionFactory(
> > > >                getAMQBrokerURL());
> > > >        activeMQConnectionFactory.setClientID(getClientID());
> > > >        return activeMQConnectionFactory;
> > > >    }
> > > >
> > > >    @Bean
> > > >    public OutboundTopicBridge outboundTopicBridge() throws
> > > > URISyntaxException {
> > > >        OutboundTopicBridge topicBridge = new OutboundTopicBridge();
> > > >        topicBridge.setLocalTopicName(OUTGOING_LOCAL_TOPIC);
> > > >        topicBridge.setOutboundTopicName(getOutboundTopicName());
> > > >        topicBridge.setConsumerName(getConsumerName());
> > > >        return topicBridge;
> > > >    }
> > > >
> > > >    @Bean
> > > >    public JmsTemplate jmsTemplate() throws UnknownHostException,
> > > >            URISyntaxException {
> > > >        JmsTemplate jmsTemplate = new JmsTemplate(
> > > >                localOutgoingTopicConnectionFactory());
> > > >        jmsTemplate.setDefaultDestinationName(OUTGOING_LOCAL_TOPIC);
> > > >        jmsTemplate.setDeliveryPersistent(true);
> > > >        jmsTemplate.setPubSubDomain(true);
> > > >        return jmsTemplate;
> > > >    }
> > > >
> > > >      @Bean(dependsOn = "brokerService")
> > > >    public TopicConnectionFactory
> localOutgoingTopicConnectionFactory()
> > > >            throws UnknownHostException, URISyntaxException {
> > > >        ActiveMQConnectionFactory activeMQConnectionFactory = new
> > > > ActiveMQConnectionFactory(
> > > >                "vm://localhost");
> > > >
> >  activeMQConnectionFactory.setClientID(getLocalOutgoingClientID());
> > > >        return activeMQConnectionFactory;
> > > >    }
> > > > }
> > > >
> > > > Regards,
> > > > Jarek
> > > >
> > > > W dniu 26 marca 2009 10:01 użytkownik Jarosław Pałka <
> jpalka@gmail.com
> > > > >napisał:
> > > >
> > > > > Hi,
> > > > >
> > > > > I have strange behavior of embedded ActiveMQ 5.2 broker.
> > > > >
> > > > > I have an web application running under Jetty 1.6.14 (with Spring
> > > 2.5.6).
> > > > I
> > > > > have embedded broker that is connected to ActiveMQ server through
> JMS
> > > > > connector.I have to topics INCOMING and OUTGOING that retrieve and
> > > > forward
> > > > > messages to ActiveMQ server.
> > > > > I use XBeanBrokerService to configure embedded broker. The
> > > configuration
> > > > of
> > > > > embedded broker is in attachment.
> > > > >
> > > > > Everything runs great until restart of Jetty, once restarted web
> > > > > application is able to receive messages but it doesn't send
> messages.
> > I
> > > > got
> > > > > following exception logged in ActiveMQ server:
> > > > >
> > > > > ERROR RecoveryListenerAdapter        - Message id
> > > > > ID:ubot2-48486-1237589773318-0:0:2291:1:1 could not be recovered
> from
> > > the
> > > > > data store - already dispatched
> > > > > ERROR Service                        - Async error occurred:
> > > > > javax.jms.JMSException: Could not correlate acknowledgment with
> > > > dispatched
> > > > > message: MessageAck {commandId = 8, responseRequired = false,
> ackType
> > =
> > > > 0,
> > > > > consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, firstMessageId =
> > > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
> > > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination =
> > topic://IN.ltbl,
> > > > > transactionId = null, messageCount = 0}
> > > > > javax.jms.JMSException: Could not correlate acknowledgment with
> > > > dispatched
> > > > > message: MessageAck {commandId = 8, responseRequired = false,
> ackType
> > =
> > > > 0,
> > > > > consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, firstMessageId =
> > > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
> > > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination =
> > topic://IN.ltbl,
> > > > > transactionId = null, messageCount = 0}
> > > > >         at
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:304)
> > > > >         at
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:373)
> > > > >         at
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462)
> > > > >         at
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
> > > > >         at
> > > > >
> > >
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> > > > >         at
> > > > >
> > >
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> > > > >         at
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
> > > > >         at
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
> > > > >         at
> > > > > org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
> > > > >         at
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
> > > > >         at
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
> > > > >         at
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
> > > > >         at
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
> > > > >         at
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
> > > > >         at
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
> > > > >         at
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
> > > > >         at
> > > > >
> > >
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
> > > > >         at java.lang.Thread.run(Thread.java:619)
> > > > >
> > > > > and similar message at web application side during shutdown:
> > > > >
> > > > > 2009-03-26 09:23:10.975::INFO:  Shutdown hook executing
> > > > > 2009-03-26 09:23:10.975::INFO:  Graceful shutdown
> > > > > SelectChannelConnector@0.0.0.0:8981
> > > > > 140421 [ActiveMQ ShutdownHook] INFO
> > > > > org.apache.activemq.broker.BrokerService  - ActiveMQ Message Broker
> > > > > (localhost, ID:ubot2-45498-1238055651187-0:0) is shutting down
> > > > > 140421 [ActiveMQ ShutdownHook] DEBUG
> > > > > org.apache.activemq.broker.BrokerService  - Caught exception, must
> be
> > > > > shutting down: java.lang.IllegalStateException: Shutdown in
> progress
> > > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > > org.mortbay.jetty.webapp.WebAppContext@173831b
> > > > > {/test-jndi,/home/jpalka/jetty-6.1.14/contexts/test-jndi.d}
> > > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > > org.mortbay.jetty.webapp.WebAppContext@1abab88
> > > > > {/,/home/jpalka/jetty-6.1.14/webapps/test}
> > > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > > org.mortbay.jetty.handler.ContextHandler@b1b4c3
> > > > > {/javadoc,file:/home/jpalka/jetty-6.1.14/javadoc/}
> > > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > > org.mortbay.jetty.webapp.WebAppContext@5e5f92
> > > > >
> > > >
> > >
> >
> {/umm-gateway,jar:file:/home/jpalka/jetty-6.1.14/webapps/umm-gateway.war!/}
> > > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > > org.mortbay.jetty.webapp.WebAppContext@1d4ab0e
> > > > >
> > {/lottoblog,jar:file:/home/jpalka/jetty-6.1.14/webapps/lottoblog.war!/}
> > > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > > org.mortbay.jetty.webapp.WebAppContext@12a585c
> > > > > {/test-jaas,file:/home/jpalka/jetty-6.1.14/webapps/test-jaas/}
> > > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > > org.mortbay.jetty.webapp.WebAppContext@10f3801
> > > > >
> > >
> {/umm-portal,jar:file:/home/jpalka/jetty-6.1.14/webapps/umm-portal.war!/}
> > > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > > org.mortbay.jetty.webapp.WebAppContext@2606b8
> > > > > {/cometd,jar:file:/home/jpalka/jetty-6.1.14/webapps/cometd.war!/}
> > > > > 140444 [ActiveMQ ShutdownHook] INFO
> > > > > org.apache.activemq.network.jms.JmsConnector  - JMS Connector
> > > Connector:0
> > > > > Stopped
> > > > > 140447 [ActiveMQ ShutdownHook] INFO
> > > > > org.apache.activemq.network.jms.JmsConnector  - JMS Connector
> > > Connector:1
> > > > > Stopped
> > > > > 140449 [ActiveMQ Transport Stopper: vm://localhost#0] DEBUG
> > > > > org.apache.activemq.broker.TransportConnection  - Stopping
> > connection:
> > > > > vm://localhost#0
> > > > > 140452 [VMTransport] DEBUG
> > > org.apache.activemq.store.amq.AMQMessageStore
> > > >  -
> > > > > flush starting ...
> > > > > 140450 [ActiveMQ Connection Worker: tcp://localhost:61616] DEBUG
> > > > > org.apache.activemq.ActiveMQConnection  - Async exception with no
> > > > exception
> > > > > listener: javax.jms.JMSException: Could not correlate
> acknowledgment
> > > with
> > > > > dispatched message: MessageAck {commandId = 8, responseRequired =
> > > false,
> > > > > ackType = 0, consumerId = ID:ubot2-45498-1238055651187-2:0:2:1,
> > > > > firstMessageId = ID:ubot2-52020-1238055648089-0:0:2:1:1,
> > lastMessageId
> > > =
> > > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination =
> > topic://IN.ltbl,
> > > > > transactionId = null, messageCount = 0}
> > > > > javax.jms.JMSException: Could not correlate acknowledgment with
> > > > dispatched
> > > > > message: MessageAck {commandId = 8, responseRequired = false,
> ackType
> > =
> > > > 0,
> > > > > consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, firstMessageId =
> > > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
> > > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination =
> > topic://IN.ltbl,
> > > > > transactionId = null, messageCount = 0}
> > > > >         at
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:304)
> > > > >         at
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:373)
> > > > >         at
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462)
> > > > >         at
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
> > > > >         at
> > > > >
> > >
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> > > > >         at
> > > > >
> > >
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> > > > >         at
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
> > > > >         at
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
> > > > >         at
> > > > > org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
> > > > >         at
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
> > > > >         at
> > > > >
> > > >
> > >
> >
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
> > > > >         at
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
> > > > >         at
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
> > > > >         at
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
> > > > >         at
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
> > > > >         at
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
> > > > >         at
> > > > >
> > >
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
> > > > >         at java.lang.Thread.run(Thread.java:619)
> > > > > 140465 [VMTransport] DEBUG org.apache.activemq.ActiveMQConnection
>  -
> > > > Async
> > > > > exception with no exception listener:
> > > > > org.apache.activemq.transport.TransportDisposedIOException: Peer
> > > > > (vm://localhost#1) disposed.
> > > > > org.apache.activemq.transport.TransportDisposedIOException: Peer
> > > > > (vm://localhost#1) disposed.
> > > > >         at
> > > > >
> > > >
> > >
> >
> org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:203)
> > > > >         at
> > > > >
> > > >
> > >
> >
> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122)
> > > > >         at
> > > > >
> > > >
> > >
> >
> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
> > > > >         at
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
> > > > >         at
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
> > > > >         at java.lang.Thread.run(Thread.java:619)
> > > > >
> > > > >
> > > > > The only workaround I found is to remove activemq-data directory at
> > web
> > > > > application site before starting Jetty again.
> > > > >
> > > > > Can you tell me what I'm doing wrong? I have not seen such behavior
> > > with
> > > > > ActiveMQ 5.1.
> > > > >
> > > > > Regards,
> > > > > Jarek
> > > > >
> > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > http://blog.garytully.com
> > >
> > > Open Source SOA
> > > http://FUSESource.com
> > >
> >
>
>
>
> --
> http://blog.garytully.com
>
> Open Source SOA
> http://FUSESource.com
>

Re: Embedded broker hangs after restart

Posted by Gary Tully <ga...@gmail.com>.
Jarek,
good news, but that exception is ugly. It would be smashing if you could
reproduce in a Junit tests case using just the JMS apis, but it may be
difficult to reproduce.

I think I need to turn the queue test case from AMQ-2149 into a Topic
scenario. That may reproduce.

2009/3/26 Jarosław Pałka <jp...@gmail.com>

> Gary,
>
> I did recommended changes. On the embedded broker side everything looks
> fine
> now. I restarted Jetty (embedded broker) many times and it worked.
>
> What is interesting I started to see some exceptions in my other
> application
> that is listening on the same ActiveMQ server. I think below schema will
> help to explain my topology,
>
> client ---> topic://OUTGOING (embedded broker with JMS connector) -->
> topic://OUT.subtopic (ActiveMQ server) --> Mule ESB
>
> When message is sent from client to MuleESB, after implementing your
> changes
> I started to see follwoing message:
>
> 19761 [ActiveMQ Session Task] DEBUG
> org.mule.transport.jms.MultiConsumerJmsMessageReceiver  - Message Received
> from: jms://topic:OUT.*
> 19761 [ActiveMQ Session Task] DEBUG org.mule.DefaultMuleSession  - There is
> no session id on the request using key: ID. Generating new session id:
> 575fc82b-1a0f-11de-8ce7-c7a955038e40
> 19762 [ActiveMQ Session Task] DEBUG org.mule.DefaultMuleSession  -
> dispatching event to service: service.que-outgoing, event is: MuleEvent:
> 575fc82c-1a0f-11de-8ce7-c7a955038e40, sync=false, stop processing=f
> alse, DefaultInboundEndpoint{endpointUri=jms://topic:OUT.*,
> connector=ActiveMQJmsConnector{this=ad97f5, started=true, initialised=true,
> name='jmsConnector', disposed=false, numberOfConcurrentTransactedReceiv
> ers=4, createMultipleTransactedReceivers=true, connected=true,
> supportedProtocols=[jms], serviceOverrides=null},
> transformer=[JMSMessageToObject{this=170b819, name='JMSMessageToObject',
> ignoreBadInput=false,
>  returnClass=class java.lang.Object, sourceTypes=[interface
> javax.jms.Message, interface javax.jms.TextMessage, interface
> javax.jms.ObjectMessage, interface javax.jms.BytesMessage, interface
> javax.jms.MapMes
> sage, interface javax.jms.StreamMessage]}, XML2UMMTransformer{this=146ad8b,
> name='xml2umm', ignoreBadInput=false, returnClass=class java.lang.Object,
> sourceTypes=[]}, ExtractHeadersTransformer{this=12b9f14,
> name='extract-headers', ignoreBadInput=false, returnClass=class
> java.lang.Object, sourceTypes=[]}], name='endpoint.jms.OUT',
> properties={durableName=cozaciota},
> transactionConfig=Transaction{factory=null, ac
> tion=NEVER, timeout=0}, filter=null, deleteUnacceptedMessages=false,
> securityFilter=null, synchronous=false, initialState=started,
> remoteSync=false, remoteSyncTimeout=10000, endpointEncoding=UTF-8}
> 10459 [ActiveMQ Connection Worker: vm://localhost#2] DEBUG
> org.apache.activemq.ActiveMQConnection  - Async exception with no exception
> listener: java.lang.NullPointerException
> java.lang.NullPointerException
>        at
>
> org.apache.activemq.kaha.impl.index.VMIndexLinkedList.remove(VMIndexLinkedList.java:265)
>        at
>
> org.apache.activemq.kaha.impl.container.ListContainerImpl.remove(ListContainerImpl.java:314)
>        at
>
> org.apache.activemq.kaha.impl.container.ListContainerImpl.remove(ListContainerImpl.java:695)
>        at
>
> org.apache.activemq.store.kahadaptor.KahaTopicReferenceStore.acknowledgeReference(KahaTopicReferenceStore.java:148)
>        at
>
> org.apache.activemq.store.amq.AMQTopicMessageStore.acknowledge(AMQTopicMessageStore.java:163)
>        at
>
> org.apache.activemq.store.amq.AMQTopicMessageStore.acknowledge(AMQTopicMessageStore.java:91)
>        at
> org.apache.activemq.broker.region.Topic.acknowledge(Topic.java:447)
>        at
>
> org.apache.activemq.broker.region.DurableTopicSubscription.acknowledge(DurableTopicSubscription.java:223)
>        at
>
> org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:238)
>         at
>
> org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:373)
>        at
>
> org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462)
>        at
>
> org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
>        at
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
>        at
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
>        at
>
> org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
>        at
>
> org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
>        at org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
>        at
>
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
>        at
>
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
>        at
>
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:104)
>         at
>
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
>        at
> org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:205)
>        at
>
> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122)
>        at
> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
>        at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
>        at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>        at java.lang.Thread.run(Thread.java:619)
>
> Application is working correctly, the only thing is this exception in logs.
> For testing purposes I'm running both client and Mule ESB under same JVM.
>
> Jarek
>
>
> W dniu 26 marca 2009 14:26 użytkownik Gary Tully <gary.tully@gmail.com
> >napisał:
>
> > Hi Jarek,
> > some similar behaviour is expressed by issue
> > AMQ-2149<https://issues.apache.org/activemq/browse/AMQ-2149>.
> >
> > One aspect of the problem is related to the persistent index used by the
> > Kaha store. You could try using a VM (in memory) index to see if it helps
> > in
> > your case.
> >
> > To configure set the setPersistentIndex attribute to false as follows:
> >
> >        broker = ....
> >        AMQPersistenceAdapterFactory persistenceFactory = new
> > AMQPersistenceAdapterFactory();
> >        persistenceFactory.setDataDirectory(dataDirFile);
> >        persistenceFactory.setPersistentIndex(false);
> >        broker.setPersistenceFactory(persistenceFactory);
> >
> > hope this helps,
> > Gary.
> >
> > 2009/3/26 Jarosław Pałka <jp...@gmail.com>
> >
> > > Sorry I forgot to copy embedded broker configuration:
> > >
> > > public abstract class UMMClientConfiguration extends
> > ConfigurationSupport{
> > >
> > >    private static final String OUTGOING_LOCAL_TOPIC = "OUTGOING";
> > >    private static final String INCOMING_LOCAL_TOPIC = "INCOMING";
> > >
> > >    @Bean
> > >    public BrokerService brokerService() throws Exception {
> > >        XBeanBrokerService broker = new XBeanBrokerService();
> > >        broker.setUseJmx(false);
> > >        broker.setPersistent(true);
> > >        //broker.
> > >        broker.addJmsConnector(incomingTopicConnector());
> > >        broker.addJmsConnector(outgoingTopicConnector());
> > >        broker.addConnector("vm://localhost");
> > >        broker.start();
> > >        return broker;
> > >    }
> > >
> > >     @Bean
> > >    public JmsTopicConnector outgoingTopicConnector()
> > >            throws UnknownHostException, URISyntaxException {
> > >        JmsTopicConnector topicConnector = new JmsTopicConnector();
> > >        topicConnector
> > >
> > > .setOutboundTopicConnectionFactory(outboundTopicConnectionFactory());
> > >        topicConnector.setLocalClientId("local");
> > >        topicConnector
> > >                .setOutboundTopicBridges(new OutboundTopicBridge[] {
> > > outboundTopicBridge() });
> > >        return topicConnector;
> > >    }
> > >
> > >    @Bean
> > >    public TopicConnectionFactory outboundTopicConnectionFactory()
> > >            throws UnknownHostException, URISyntaxException {
> > >        ActiveMQConnectionFactory activeMQConnectionFactory = new
> > > ActiveMQConnectionFactory(
> > >                getAMQBrokerURL());
> > >        activeMQConnectionFactory.setClientID(getClientID());
> > >        return activeMQConnectionFactory;
> > >    }
> > >
> > >    @Bean
> > >    public OutboundTopicBridge outboundTopicBridge() throws
> > > URISyntaxException {
> > >        OutboundTopicBridge topicBridge = new OutboundTopicBridge();
> > >        topicBridge.setLocalTopicName(OUTGOING_LOCAL_TOPIC);
> > >        topicBridge.setOutboundTopicName(getOutboundTopicName());
> > >        topicBridge.setConsumerName(getConsumerName());
> > >        return topicBridge;
> > >    }
> > >
> > >    @Bean
> > >    public JmsTemplate jmsTemplate() throws UnknownHostException,
> > >            URISyntaxException {
> > >        JmsTemplate jmsTemplate = new JmsTemplate(
> > >                localOutgoingTopicConnectionFactory());
> > >        jmsTemplate.setDefaultDestinationName(OUTGOING_LOCAL_TOPIC);
> > >        jmsTemplate.setDeliveryPersistent(true);
> > >        jmsTemplate.setPubSubDomain(true);
> > >        return jmsTemplate;
> > >    }
> > >
> > >      @Bean(dependsOn = "brokerService")
> > >    public TopicConnectionFactory localOutgoingTopicConnectionFactory()
> > >            throws UnknownHostException, URISyntaxException {
> > >        ActiveMQConnectionFactory activeMQConnectionFactory = new
> > > ActiveMQConnectionFactory(
> > >                "vm://localhost");
> > >
>  activeMQConnectionFactory.setClientID(getLocalOutgoingClientID());
> > >        return activeMQConnectionFactory;
> > >    }
> > > }
> > >
> > > Regards,
> > > Jarek
> > >
> > > W dniu 26 marca 2009 10:01 użytkownik Jarosław Pałka <jpalka@gmail.com
> > > >napisał:
> > >
> > > > Hi,
> > > >
> > > > I have strange behavior of embedded ActiveMQ 5.2 broker.
> > > >
> > > > I have an web application running under Jetty 1.6.14 (with Spring
> > 2.5.6).
> > > I
> > > > have embedded broker that is connected to ActiveMQ server through JMS
> > > > connector.I have to topics INCOMING and OUTGOING that retrieve and
> > > forward
> > > > messages to ActiveMQ server.
> > > > I use XBeanBrokerService to configure embedded broker. The
> > configuration
> > > of
> > > > embedded broker is in attachment.
> > > >
> > > > Everything runs great until restart of Jetty, once restarted web
> > > > application is able to receive messages but it doesn't send messages.
> I
> > > got
> > > > following exception logged in ActiveMQ server:
> > > >
> > > > ERROR RecoveryListenerAdapter        - Message id
> > > > ID:ubot2-48486-1237589773318-0:0:2291:1:1 could not be recovered from
> > the
> > > > data store - already dispatched
> > > > ERROR Service                        - Async error occurred:
> > > > javax.jms.JMSException: Could not correlate acknowledgment with
> > > dispatched
> > > > message: MessageAck {commandId = 8, responseRequired = false, ackType
> =
> > > 0,
> > > > consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, firstMessageId =
> > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
> > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination =
> topic://IN.ltbl,
> > > > transactionId = null, messageCount = 0}
> > > > javax.jms.JMSException: Could not correlate acknowledgment with
> > > dispatched
> > > > message: MessageAck {commandId = 8, responseRequired = false, ackType
> =
> > > 0,
> > > > consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, firstMessageId =
> > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
> > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination =
> topic://IN.ltbl,
> > > > transactionId = null, messageCount = 0}
> > > >         at
> > > >
> > >
> >
> org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:304)
> > > >         at
> > > >
> > >
> >
> org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:373)
> > > >         at
> > > >
> > >
> >
> org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462)
> > > >         at
> > > >
> > >
> >
> org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
> > > >         at
> > > >
> > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> > > >         at
> > > >
> > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> > > >         at
> > > >
> > >
> >
> org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
> > > >         at
> > > >
> > >
> >
> org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
> > > >         at
> > > > org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
> > > >         at
> > > >
> > >
> >
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
> > > >         at
> > > >
> > >
> >
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
> > > >         at
> > > >
> > >
> >
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
> > > >         at
> > > >
> > >
> >
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
> > > >         at
> > > >
> > >
> >
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
> > > >         at
> > > >
> > >
> >
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
> > > >         at
> > > >
> > >
> >
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
> > > >         at
> > > >
> > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
> > > >         at java.lang.Thread.run(Thread.java:619)
> > > >
> > > > and similar message at web application side during shutdown:
> > > >
> > > > 2009-03-26 09:23:10.975::INFO:  Shutdown hook executing
> > > > 2009-03-26 09:23:10.975::INFO:  Graceful shutdown
> > > > SelectChannelConnector@0.0.0.0:8981
> > > > 140421 [ActiveMQ ShutdownHook] INFO
> > > > org.apache.activemq.broker.BrokerService  - ActiveMQ Message Broker
> > > > (localhost, ID:ubot2-45498-1238055651187-0:0) is shutting down
> > > > 140421 [ActiveMQ ShutdownHook] DEBUG
> > > > org.apache.activemq.broker.BrokerService  - Caught exception, must be
> > > > shutting down: java.lang.IllegalStateException: Shutdown in progress
> > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > org.mortbay.jetty.webapp.WebAppContext@173831b
> > > > {/test-jndi,/home/jpalka/jetty-6.1.14/contexts/test-jndi.d}
> > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > org.mortbay.jetty.webapp.WebAppContext@1abab88
> > > > {/,/home/jpalka/jetty-6.1.14/webapps/test}
> > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > org.mortbay.jetty.handler.ContextHandler@b1b4c3
> > > > {/javadoc,file:/home/jpalka/jetty-6.1.14/javadoc/}
> > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > org.mortbay.jetty.webapp.WebAppContext@5e5f92
> > > >
> > >
> >
> {/umm-gateway,jar:file:/home/jpalka/jetty-6.1.14/webapps/umm-gateway.war!/}
> > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > org.mortbay.jetty.webapp.WebAppContext@1d4ab0e
> > > >
> {/lottoblog,jar:file:/home/jpalka/jetty-6.1.14/webapps/lottoblog.war!/}
> > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > org.mortbay.jetty.webapp.WebAppContext@12a585c
> > > > {/test-jaas,file:/home/jpalka/jetty-6.1.14/webapps/test-jaas/}
> > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > org.mortbay.jetty.webapp.WebAppContext@10f3801
> > > >
> > {/umm-portal,jar:file:/home/jpalka/jetty-6.1.14/webapps/umm-portal.war!/}
> > > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > > org.mortbay.jetty.webapp.WebAppContext@2606b8
> > > > {/cometd,jar:file:/home/jpalka/jetty-6.1.14/webapps/cometd.war!/}
> > > > 140444 [ActiveMQ ShutdownHook] INFO
> > > > org.apache.activemq.network.jms.JmsConnector  - JMS Connector
> > Connector:0
> > > > Stopped
> > > > 140447 [ActiveMQ ShutdownHook] INFO
> > > > org.apache.activemq.network.jms.JmsConnector  - JMS Connector
> > Connector:1
> > > > Stopped
> > > > 140449 [ActiveMQ Transport Stopper: vm://localhost#0] DEBUG
> > > > org.apache.activemq.broker.TransportConnection  - Stopping
> connection:
> > > > vm://localhost#0
> > > > 140452 [VMTransport] DEBUG
> > org.apache.activemq.store.amq.AMQMessageStore
> > >  -
> > > > flush starting ...
> > > > 140450 [ActiveMQ Connection Worker: tcp://localhost:61616] DEBUG
> > > > org.apache.activemq.ActiveMQConnection  - Async exception with no
> > > exception
> > > > listener: javax.jms.JMSException: Could not correlate acknowledgment
> > with
> > > > dispatched message: MessageAck {commandId = 8, responseRequired =
> > false,
> > > > ackType = 0, consumerId = ID:ubot2-45498-1238055651187-2:0:2:1,
> > > > firstMessageId = ID:ubot2-52020-1238055648089-0:0:2:1:1,
> lastMessageId
> > =
> > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination =
> topic://IN.ltbl,
> > > > transactionId = null, messageCount = 0}
> > > > javax.jms.JMSException: Could not correlate acknowledgment with
> > > dispatched
> > > > message: MessageAck {commandId = 8, responseRequired = false, ackType
> =
> > > 0,
> > > > consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, firstMessageId =
> > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
> > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination =
> topic://IN.ltbl,
> > > > transactionId = null, messageCount = 0}
> > > >         at
> > > >
> > >
> >
> org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:304)
> > > >         at
> > > >
> > >
> >
> org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:373)
> > > >         at
> > > >
> > >
> >
> org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462)
> > > >         at
> > > >
> > >
> >
> org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
> > > >         at
> > > >
> > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> > > >         at
> > > >
> > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> > > >         at
> > > >
> > >
> >
> org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
> > > >         at
> > > >
> > >
> >
> org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
> > > >         at
> > > > org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
> > > >         at
> > > >
> > >
> >
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
> > > >         at
> > > >
> > >
> >
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
> > > >         at
> > > >
> > >
> >
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
> > > >         at
> > > >
> > >
> >
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
> > > >         at
> > > >
> > >
> >
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
> > > >         at
> > > >
> > >
> >
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
> > > >         at
> > > >
> > >
> >
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
> > > >         at
> > > >
> > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
> > > >         at java.lang.Thread.run(Thread.java:619)
> > > > 140465 [VMTransport] DEBUG org.apache.activemq.ActiveMQConnection  -
> > > Async
> > > > exception with no exception listener:
> > > > org.apache.activemq.transport.TransportDisposedIOException: Peer
> > > > (vm://localhost#1) disposed.
> > > > org.apache.activemq.transport.TransportDisposedIOException: Peer
> > > > (vm://localhost#1) disposed.
> > > >         at
> > > >
> > >
> >
> org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:203)
> > > >         at
> > > >
> > >
> >
> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122)
> > > >         at
> > > >
> > >
> >
> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
> > > >         at
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
> > > >         at
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
> > > >         at java.lang.Thread.run(Thread.java:619)
> > > >
> > > >
> > > > The only workaround I found is to remove activemq-data directory at
> web
> > > > application site before starting Jetty again.
> > > >
> > > > Can you tell me what I'm doing wrong? I have not seen such behavior
> > with
> > > > ActiveMQ 5.1.
> > > >
> > > > Regards,
> > > > Jarek
> > > >
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > http://blog.garytully.com
> >
> > Open Source SOA
> > http://FUSESource.com
> >
>



-- 
http://blog.garytully.com

Open Source SOA
http://FUSESource.com

Re: Embedded broker hangs after restart

Posted by Jarosław Pałka <jp...@gmail.com>.
Gary,

I did recommended changes. On the embedded broker side everything looks fine
now. I restarted Jetty (embedded broker) many times and it worked.

What is interesting I started to see some exceptions in my other application
that is listening on the same ActiveMQ server. I think below schema will
help to explain my topology,

client ---> topic://OUTGOING (embedded broker with JMS connector) -->
topic://OUT.subtopic (ActiveMQ server) --> Mule ESB

When message is sent from client to MuleESB, after implementing your changes
I started to see follwoing message:

19761 [ActiveMQ Session Task] DEBUG
org.mule.transport.jms.MultiConsumerJmsMessageReceiver  - Message Received
from: jms://topic:OUT.*
19761 [ActiveMQ Session Task] DEBUG org.mule.DefaultMuleSession  - There is
no session id on the request using key: ID. Generating new session id:
575fc82b-1a0f-11de-8ce7-c7a955038e40
19762 [ActiveMQ Session Task] DEBUG org.mule.DefaultMuleSession  -
dispatching event to service: service.que-outgoing, event is: MuleEvent:
575fc82c-1a0f-11de-8ce7-c7a955038e40, sync=false, stop processing=f
alse, DefaultInboundEndpoint{endpointUri=jms://topic:OUT.*,
connector=ActiveMQJmsConnector{this=ad97f5, started=true, initialised=true,
name='jmsConnector', disposed=false, numberOfConcurrentTransactedReceiv
ers=4, createMultipleTransactedReceivers=true, connected=true,
supportedProtocols=[jms], serviceOverrides=null},
transformer=[JMSMessageToObject{this=170b819, name='JMSMessageToObject',
ignoreBadInput=false,
 returnClass=class java.lang.Object, sourceTypes=[interface
javax.jms.Message, interface javax.jms.TextMessage, interface
javax.jms.ObjectMessage, interface javax.jms.BytesMessage, interface
javax.jms.MapMes
sage, interface javax.jms.StreamMessage]}, XML2UMMTransformer{this=146ad8b,
name='xml2umm', ignoreBadInput=false, returnClass=class java.lang.Object,
sourceTypes=[]}, ExtractHeadersTransformer{this=12b9f14,
name='extract-headers', ignoreBadInput=false, returnClass=class
java.lang.Object, sourceTypes=[]}], name='endpoint.jms.OUT',
properties={durableName=cozaciota},
transactionConfig=Transaction{factory=null, ac
tion=NEVER, timeout=0}, filter=null, deleteUnacceptedMessages=false,
securityFilter=null, synchronous=false, initialState=started,
remoteSync=false, remoteSyncTimeout=10000, endpointEncoding=UTF-8}
10459 [ActiveMQ Connection Worker: vm://localhost#2] DEBUG
org.apache.activemq.ActiveMQConnection  - Async exception with no exception
listener: java.lang.NullPointerException
java.lang.NullPointerException
        at
org.apache.activemq.kaha.impl.index.VMIndexLinkedList.remove(VMIndexLinkedList.java:265)
        at
org.apache.activemq.kaha.impl.container.ListContainerImpl.remove(ListContainerImpl.java:314)
        at
org.apache.activemq.kaha.impl.container.ListContainerImpl.remove(ListContainerImpl.java:695)
        at
org.apache.activemq.store.kahadaptor.KahaTopicReferenceStore.acknowledgeReference(KahaTopicReferenceStore.java:148)
        at
org.apache.activemq.store.amq.AMQTopicMessageStore.acknowledge(AMQTopicMessageStore.java:163)
        at
org.apache.activemq.store.amq.AMQTopicMessageStore.acknowledge(AMQTopicMessageStore.java:91)
        at
org.apache.activemq.broker.region.Topic.acknowledge(Topic.java:447)
        at
org.apache.activemq.broker.region.DurableTopicSubscription.acknowledge(DurableTopicSubscription.java:223)
        at
org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:238)
        at
org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:373)
        at
org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462)
        at
org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
        at
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
        at
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
        at
org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
        at
org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
        at org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
        at
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
        at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
        at
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:104)
        at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
        at
org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:205)
        at
org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122)
        at
org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
        at java.lang.Thread.run(Thread.java:619)

Application is working correctly, the only thing is this exception in logs.
For testing purposes I'm running both client and Mule ESB under same JVM.

Jarek


W dniu 26 marca 2009 14:26 użytkownik Gary Tully <ga...@gmail.com>napisał:

> Hi Jarek,
> some similar behaviour is expressed by issue
> AMQ-2149<https://issues.apache.org/activemq/browse/AMQ-2149>.
>
> One aspect of the problem is related to the persistent index used by the
> Kaha store. You could try using a VM (in memory) index to see if it helps
> in
> your case.
>
> To configure set the setPersistentIndex attribute to false as follows:
>
>        broker = ....
>        AMQPersistenceAdapterFactory persistenceFactory = new
> AMQPersistenceAdapterFactory();
>        persistenceFactory.setDataDirectory(dataDirFile);
>        persistenceFactory.setPersistentIndex(false);
>        broker.setPersistenceFactory(persistenceFactory);
>
> hope this helps,
> Gary.
>
> 2009/3/26 Jarosław Pałka <jp...@gmail.com>
>
> > Sorry I forgot to copy embedded broker configuration:
> >
> > public abstract class UMMClientConfiguration extends
> ConfigurationSupport{
> >
> >    private static final String OUTGOING_LOCAL_TOPIC = "OUTGOING";
> >    private static final String INCOMING_LOCAL_TOPIC = "INCOMING";
> >
> >    @Bean
> >    public BrokerService brokerService() throws Exception {
> >        XBeanBrokerService broker = new XBeanBrokerService();
> >        broker.setUseJmx(false);
> >        broker.setPersistent(true);
> >        //broker.
> >        broker.addJmsConnector(incomingTopicConnector());
> >        broker.addJmsConnector(outgoingTopicConnector());
> >        broker.addConnector("vm://localhost");
> >        broker.start();
> >        return broker;
> >    }
> >
> >     @Bean
> >    public JmsTopicConnector outgoingTopicConnector()
> >            throws UnknownHostException, URISyntaxException {
> >        JmsTopicConnector topicConnector = new JmsTopicConnector();
> >        topicConnector
> >
> > .setOutboundTopicConnectionFactory(outboundTopicConnectionFactory());
> >        topicConnector.setLocalClientId("local");
> >        topicConnector
> >                .setOutboundTopicBridges(new OutboundTopicBridge[] {
> > outboundTopicBridge() });
> >        return topicConnector;
> >    }
> >
> >    @Bean
> >    public TopicConnectionFactory outboundTopicConnectionFactory()
> >            throws UnknownHostException, URISyntaxException {
> >        ActiveMQConnectionFactory activeMQConnectionFactory = new
> > ActiveMQConnectionFactory(
> >                getAMQBrokerURL());
> >        activeMQConnectionFactory.setClientID(getClientID());
> >        return activeMQConnectionFactory;
> >    }
> >
> >    @Bean
> >    public OutboundTopicBridge outboundTopicBridge() throws
> > URISyntaxException {
> >        OutboundTopicBridge topicBridge = new OutboundTopicBridge();
> >        topicBridge.setLocalTopicName(OUTGOING_LOCAL_TOPIC);
> >        topicBridge.setOutboundTopicName(getOutboundTopicName());
> >        topicBridge.setConsumerName(getConsumerName());
> >        return topicBridge;
> >    }
> >
> >    @Bean
> >    public JmsTemplate jmsTemplate() throws UnknownHostException,
> >            URISyntaxException {
> >        JmsTemplate jmsTemplate = new JmsTemplate(
> >                localOutgoingTopicConnectionFactory());
> >        jmsTemplate.setDefaultDestinationName(OUTGOING_LOCAL_TOPIC);
> >        jmsTemplate.setDeliveryPersistent(true);
> >        jmsTemplate.setPubSubDomain(true);
> >        return jmsTemplate;
> >    }
> >
> >      @Bean(dependsOn = "brokerService")
> >    public TopicConnectionFactory localOutgoingTopicConnectionFactory()
> >            throws UnknownHostException, URISyntaxException {
> >        ActiveMQConnectionFactory activeMQConnectionFactory = new
> > ActiveMQConnectionFactory(
> >                "vm://localhost");
> >        activeMQConnectionFactory.setClientID(getLocalOutgoingClientID());
> >        return activeMQConnectionFactory;
> >    }
> > }
> >
> > Regards,
> > Jarek
> >
> > W dniu 26 marca 2009 10:01 użytkownik Jarosław Pałka <jpalka@gmail.com
> > >napisał:
> >
> > > Hi,
> > >
> > > I have strange behavior of embedded ActiveMQ 5.2 broker.
> > >
> > > I have an web application running under Jetty 1.6.14 (with Spring
> 2.5.6).
> > I
> > > have embedded broker that is connected to ActiveMQ server through JMS
> > > connector.I have to topics INCOMING and OUTGOING that retrieve and
> > forward
> > > messages to ActiveMQ server.
> > > I use XBeanBrokerService to configure embedded broker. The
> configuration
> > of
> > > embedded broker is in attachment.
> > >
> > > Everything runs great until restart of Jetty, once restarted web
> > > application is able to receive messages but it doesn't send messages. I
> > got
> > > following exception logged in ActiveMQ server:
> > >
> > > ERROR RecoveryListenerAdapter        - Message id
> > > ID:ubot2-48486-1237589773318-0:0:2291:1:1 could not be recovered from
> the
> > > data store - already dispatched
> > > ERROR Service                        - Async error occurred:
> > > javax.jms.JMSException: Could not correlate acknowledgment with
> > dispatched
> > > message: MessageAck {commandId = 8, responseRequired = false, ackType =
> > 0,
> > > consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, firstMessageId =
> > > ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
> > > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination = topic://IN.ltbl,
> > > transactionId = null, messageCount = 0}
> > > javax.jms.JMSException: Could not correlate acknowledgment with
> > dispatched
> > > message: MessageAck {commandId = 8, responseRequired = false, ackType =
> > 0,
> > > consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, firstMessageId =
> > > ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
> > > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination = topic://IN.ltbl,
> > > transactionId = null, messageCount = 0}
> > >         at
> > >
> >
> org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:304)
> > >         at
> > >
> >
> org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:373)
> > >         at
> > >
> >
> org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462)
> > >         at
> > >
> >
> org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
> > >         at
> > >
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> > >         at
> > >
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> > >         at
> > >
> >
> org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
> > >         at
> > >
> >
> org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
> > >         at
> > > org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
> > >         at
> > >
> >
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
> > >         at
> > >
> >
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
> > >         at
> > >
> >
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
> > >         at
> > >
> >
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
> > >         at
> > >
> >
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
> > >         at
> > >
> >
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
> > >         at
> > >
> >
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
> > >         at
> > >
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
> > >         at java.lang.Thread.run(Thread.java:619)
> > >
> > > and similar message at web application side during shutdown:
> > >
> > > 2009-03-26 09:23:10.975::INFO:  Shutdown hook executing
> > > 2009-03-26 09:23:10.975::INFO:  Graceful shutdown
> > > SelectChannelConnector@0.0.0.0:8981
> > > 140421 [ActiveMQ ShutdownHook] INFO
> > > org.apache.activemq.broker.BrokerService  - ActiveMQ Message Broker
> > > (localhost, ID:ubot2-45498-1238055651187-0:0) is shutting down
> > > 140421 [ActiveMQ ShutdownHook] DEBUG
> > > org.apache.activemq.broker.BrokerService  - Caught exception, must be
> > > shutting down: java.lang.IllegalStateException: Shutdown in progress
> > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > org.mortbay.jetty.webapp.WebAppContext@173831b
> > > {/test-jndi,/home/jpalka/jetty-6.1.14/contexts/test-jndi.d}
> > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > org.mortbay.jetty.webapp.WebAppContext@1abab88
> > > {/,/home/jpalka/jetty-6.1.14/webapps/test}
> > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > org.mortbay.jetty.handler.ContextHandler@b1b4c3
> > > {/javadoc,file:/home/jpalka/jetty-6.1.14/javadoc/}
> > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > org.mortbay.jetty.webapp.WebAppContext@5e5f92
> > >
> >
> {/umm-gateway,jar:file:/home/jpalka/jetty-6.1.14/webapps/umm-gateway.war!/}
> > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > org.mortbay.jetty.webapp.WebAppContext@1d4ab0e
> > > {/lottoblog,jar:file:/home/jpalka/jetty-6.1.14/webapps/lottoblog.war!/}
> > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > org.mortbay.jetty.webapp.WebAppContext@12a585c
> > > {/test-jaas,file:/home/jpalka/jetty-6.1.14/webapps/test-jaas/}
> > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > org.mortbay.jetty.webapp.WebAppContext@10f3801
> > >
> {/umm-portal,jar:file:/home/jpalka/jetty-6.1.14/webapps/umm-portal.war!/}
> > > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > > org.mortbay.jetty.webapp.WebAppContext@2606b8
> > > {/cometd,jar:file:/home/jpalka/jetty-6.1.14/webapps/cometd.war!/}
> > > 140444 [ActiveMQ ShutdownHook] INFO
> > > org.apache.activemq.network.jms.JmsConnector  - JMS Connector
> Connector:0
> > > Stopped
> > > 140447 [ActiveMQ ShutdownHook] INFO
> > > org.apache.activemq.network.jms.JmsConnector  - JMS Connector
> Connector:1
> > > Stopped
> > > 140449 [ActiveMQ Transport Stopper: vm://localhost#0] DEBUG
> > > org.apache.activemq.broker.TransportConnection  - Stopping connection:
> > > vm://localhost#0
> > > 140452 [VMTransport] DEBUG
> org.apache.activemq.store.amq.AMQMessageStore
> >  -
> > > flush starting ...
> > > 140450 [ActiveMQ Connection Worker: tcp://localhost:61616] DEBUG
> > > org.apache.activemq.ActiveMQConnection  - Async exception with no
> > exception
> > > listener: javax.jms.JMSException: Could not correlate acknowledgment
> with
> > > dispatched message: MessageAck {commandId = 8, responseRequired =
> false,
> > > ackType = 0, consumerId = ID:ubot2-45498-1238055651187-2:0:2:1,
> > > firstMessageId = ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId
> =
> > > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination = topic://IN.ltbl,
> > > transactionId = null, messageCount = 0}
> > > javax.jms.JMSException: Could not correlate acknowledgment with
> > dispatched
> > > message: MessageAck {commandId = 8, responseRequired = false, ackType =
> > 0,
> > > consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, firstMessageId =
> > > ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
> > > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination = topic://IN.ltbl,
> > > transactionId = null, messageCount = 0}
> > >         at
> > >
> >
> org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:304)
> > >         at
> > >
> >
> org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:373)
> > >         at
> > >
> >
> org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462)
> > >         at
> > >
> >
> org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
> > >         at
> > >
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> > >         at
> > >
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> > >         at
> > >
> >
> org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
> > >         at
> > >
> >
> org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
> > >         at
> > > org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
> > >         at
> > >
> >
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
> > >         at
> > >
> >
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
> > >         at
> > >
> >
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
> > >         at
> > >
> >
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
> > >         at
> > >
> >
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
> > >         at
> > >
> >
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
> > >         at
> > >
> >
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
> > >         at
> > >
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
> > >         at java.lang.Thread.run(Thread.java:619)
> > > 140465 [VMTransport] DEBUG org.apache.activemq.ActiveMQConnection  -
> > Async
> > > exception with no exception listener:
> > > org.apache.activemq.transport.TransportDisposedIOException: Peer
> > > (vm://localhost#1) disposed.
> > > org.apache.activemq.transport.TransportDisposedIOException: Peer
> > > (vm://localhost#1) disposed.
> > >         at
> > >
> >
> org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:203)
> > >         at
> > >
> >
> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122)
> > >         at
> > >
> >
> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
> > >         at
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
> > >         at
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
> > >         at java.lang.Thread.run(Thread.java:619)
> > >
> > >
> > > The only workaround I found is to remove activemq-data directory at web
> > > application site before starting Jetty again.
> > >
> > > Can you tell me what I'm doing wrong? I have not seen such behavior
> with
> > > ActiveMQ 5.1.
> > >
> > > Regards,
> > > Jarek
> > >
> > >
> > >
> >
>
>
>
> --
> http://blog.garytully.com
>
> Open Source SOA
> http://FUSESource.com
>

Re: Embedded broker hangs after restart

Posted by Gary Tully <ga...@gmail.com>.
Hi Jarek,
some similar behaviour is expressed by issue
AMQ-2149<https://issues.apache.org/activemq/browse/AMQ-2149>.

One aspect of the problem is related to the persistent index used by the
Kaha store. You could try using a VM (in memory) index to see if it helps in
your case.

To configure set the setPersistentIndex attribute to false as follows:

        broker = ....
        AMQPersistenceAdapterFactory persistenceFactory = new
AMQPersistenceAdapterFactory();
        persistenceFactory.setDataDirectory(dataDirFile);
        persistenceFactory.setPersistentIndex(false);
        broker.setPersistenceFactory(persistenceFactory);

hope this helps,
Gary.

2009/3/26 Jarosław Pałka <jp...@gmail.com>

> Sorry I forgot to copy embedded broker configuration:
>
> public abstract class UMMClientConfiguration extends ConfigurationSupport{
>
>    private static final String OUTGOING_LOCAL_TOPIC = "OUTGOING";
>    private static final String INCOMING_LOCAL_TOPIC = "INCOMING";
>
>    @Bean
>    public BrokerService brokerService() throws Exception {
>        XBeanBrokerService broker = new XBeanBrokerService();
>        broker.setUseJmx(false);
>        broker.setPersistent(true);
>        //broker.
>        broker.addJmsConnector(incomingTopicConnector());
>        broker.addJmsConnector(outgoingTopicConnector());
>        broker.addConnector("vm://localhost");
>        broker.start();
>        return broker;
>    }
>
>     @Bean
>    public JmsTopicConnector outgoingTopicConnector()
>            throws UnknownHostException, URISyntaxException {
>        JmsTopicConnector topicConnector = new JmsTopicConnector();
>        topicConnector
>
> .setOutboundTopicConnectionFactory(outboundTopicConnectionFactory());
>        topicConnector.setLocalClientId("local");
>        topicConnector
>                .setOutboundTopicBridges(new OutboundTopicBridge[] {
> outboundTopicBridge() });
>        return topicConnector;
>    }
>
>    @Bean
>    public TopicConnectionFactory outboundTopicConnectionFactory()
>            throws UnknownHostException, URISyntaxException {
>        ActiveMQConnectionFactory activeMQConnectionFactory = new
> ActiveMQConnectionFactory(
>                getAMQBrokerURL());
>        activeMQConnectionFactory.setClientID(getClientID());
>        return activeMQConnectionFactory;
>    }
>
>    @Bean
>    public OutboundTopicBridge outboundTopicBridge() throws
> URISyntaxException {
>        OutboundTopicBridge topicBridge = new OutboundTopicBridge();
>        topicBridge.setLocalTopicName(OUTGOING_LOCAL_TOPIC);
>        topicBridge.setOutboundTopicName(getOutboundTopicName());
>        topicBridge.setConsumerName(getConsumerName());
>        return topicBridge;
>    }
>
>    @Bean
>    public JmsTemplate jmsTemplate() throws UnknownHostException,
>            URISyntaxException {
>        JmsTemplate jmsTemplate = new JmsTemplate(
>                localOutgoingTopicConnectionFactory());
>        jmsTemplate.setDefaultDestinationName(OUTGOING_LOCAL_TOPIC);
>        jmsTemplate.setDeliveryPersistent(true);
>        jmsTemplate.setPubSubDomain(true);
>        return jmsTemplate;
>    }
>
>      @Bean(dependsOn = "brokerService")
>    public TopicConnectionFactory localOutgoingTopicConnectionFactory()
>            throws UnknownHostException, URISyntaxException {
>        ActiveMQConnectionFactory activeMQConnectionFactory = new
> ActiveMQConnectionFactory(
>                "vm://localhost");
>        activeMQConnectionFactory.setClientID(getLocalOutgoingClientID());
>        return activeMQConnectionFactory;
>    }
> }
>
> Regards,
> Jarek
>
> W dniu 26 marca 2009 10:01 użytkownik Jarosław Pałka <jpalka@gmail.com
> >napisał:
>
> > Hi,
> >
> > I have strange behavior of embedded ActiveMQ 5.2 broker.
> >
> > I have an web application running under Jetty 1.6.14 (with Spring 2.5.6).
> I
> > have embedded broker that is connected to ActiveMQ server through JMS
> > connector.I have to topics INCOMING and OUTGOING that retrieve and
> forward
> > messages to ActiveMQ server.
> > I use XBeanBrokerService to configure embedded broker. The configuration
> of
> > embedded broker is in attachment.
> >
> > Everything runs great until restart of Jetty, once restarted web
> > application is able to receive messages but it doesn't send messages. I
> got
> > following exception logged in ActiveMQ server:
> >
> > ERROR RecoveryListenerAdapter        - Message id
> > ID:ubot2-48486-1237589773318-0:0:2291:1:1 could not be recovered from the
> > data store - already dispatched
> > ERROR Service                        - Async error occurred:
> > javax.jms.JMSException: Could not correlate acknowledgment with
> dispatched
> > message: MessageAck {commandId = 8, responseRequired = false, ackType =
> 0,
> > consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, firstMessageId =
> > ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
> > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination = topic://IN.ltbl,
> > transactionId = null, messageCount = 0}
> > javax.jms.JMSException: Could not correlate acknowledgment with
> dispatched
> > message: MessageAck {commandId = 8, responseRequired = false, ackType =
> 0,
> > consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, firstMessageId =
> > ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
> > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination = topic://IN.ltbl,
> > transactionId = null, messageCount = 0}
> >         at
> >
> org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:304)
> >         at
> >
> org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:373)
> >         at
> >
> org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462)
> >         at
> >
> org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
> >         at
> > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> >         at
> > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> >         at
> >
> org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
> >         at
> >
> org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
> >         at
> > org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
> >         at
> >
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
> >         at
> >
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
> >         at
> >
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
> >         at
> >
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
> >         at
> >
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
> >         at
> >
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
> >         at
> >
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
> >         at
> > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
> >         at java.lang.Thread.run(Thread.java:619)
> >
> > and similar message at web application side during shutdown:
> >
> > 2009-03-26 09:23:10.975::INFO:  Shutdown hook executing
> > 2009-03-26 09:23:10.975::INFO:  Graceful shutdown
> > SelectChannelConnector@0.0.0.0:8981
> > 140421 [ActiveMQ ShutdownHook] INFO
> > org.apache.activemq.broker.BrokerService  - ActiveMQ Message Broker
> > (localhost, ID:ubot2-45498-1238055651187-0:0) is shutting down
> > 140421 [ActiveMQ ShutdownHook] DEBUG
> > org.apache.activemq.broker.BrokerService  - Caught exception, must be
> > shutting down: java.lang.IllegalStateException: Shutdown in progress
> > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > org.mortbay.jetty.webapp.WebAppContext@173831b
> > {/test-jndi,/home/jpalka/jetty-6.1.14/contexts/test-jndi.d}
> > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > org.mortbay.jetty.webapp.WebAppContext@1abab88
> > {/,/home/jpalka/jetty-6.1.14/webapps/test}
> > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > org.mortbay.jetty.handler.ContextHandler@b1b4c3
> > {/javadoc,file:/home/jpalka/jetty-6.1.14/javadoc/}
> > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > org.mortbay.jetty.webapp.WebAppContext@5e5f92
> >
> {/umm-gateway,jar:file:/home/jpalka/jetty-6.1.14/webapps/umm-gateway.war!/}
> > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > org.mortbay.jetty.webapp.WebAppContext@1d4ab0e
> > {/lottoblog,jar:file:/home/jpalka/jetty-6.1.14/webapps/lottoblog.war!/}
> > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > org.mortbay.jetty.webapp.WebAppContext@12a585c
> > {/test-jaas,file:/home/jpalka/jetty-6.1.14/webapps/test-jaas/}
> > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > org.mortbay.jetty.webapp.WebAppContext@10f3801
> > {/umm-portal,jar:file:/home/jpalka/jetty-6.1.14/webapps/umm-portal.war!/}
> > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > org.mortbay.jetty.webapp.WebAppContext@2606b8
> > {/cometd,jar:file:/home/jpalka/jetty-6.1.14/webapps/cometd.war!/}
> > 140444 [ActiveMQ ShutdownHook] INFO
> > org.apache.activemq.network.jms.JmsConnector  - JMS Connector Connector:0
> > Stopped
> > 140447 [ActiveMQ ShutdownHook] INFO
> > org.apache.activemq.network.jms.JmsConnector  - JMS Connector Connector:1
> > Stopped
> > 140449 [ActiveMQ Transport Stopper: vm://localhost#0] DEBUG
> > org.apache.activemq.broker.TransportConnection  - Stopping connection:
> > vm://localhost#0
> > 140452 [VMTransport] DEBUG org.apache.activemq.store.amq.AMQMessageStore
>  -
> > flush starting ...
> > 140450 [ActiveMQ Connection Worker: tcp://localhost:61616] DEBUG
> > org.apache.activemq.ActiveMQConnection  - Async exception with no
> exception
> > listener: javax.jms.JMSException: Could not correlate acknowledgment with
> > dispatched message: MessageAck {commandId = 8, responseRequired = false,
> > ackType = 0, consumerId = ID:ubot2-45498-1238055651187-2:0:2:1,
> > firstMessageId = ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
> > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination = topic://IN.ltbl,
> > transactionId = null, messageCount = 0}
> > javax.jms.JMSException: Could not correlate acknowledgment with
> dispatched
> > message: MessageAck {commandId = 8, responseRequired = false, ackType =
> 0,
> > consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, firstMessageId =
> > ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
> > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination = topic://IN.ltbl,
> > transactionId = null, messageCount = 0}
> >         at
> >
> org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:304)
> >         at
> >
> org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:373)
> >         at
> >
> org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462)
> >         at
> >
> org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
> >         at
> > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> >         at
> > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> >         at
> >
> org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
> >         at
> >
> org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
> >         at
> > org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
> >         at
> >
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
> >         at
> >
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
> >         at
> >
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
> >         at
> >
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
> >         at
> >
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
> >         at
> >
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
> >         at
> >
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
> >         at
> > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
> >         at java.lang.Thread.run(Thread.java:619)
> > 140465 [VMTransport] DEBUG org.apache.activemq.ActiveMQConnection  -
> Async
> > exception with no exception listener:
> > org.apache.activemq.transport.TransportDisposedIOException: Peer
> > (vm://localhost#1) disposed.
> > org.apache.activemq.transport.TransportDisposedIOException: Peer
> > (vm://localhost#1) disposed.
> >         at
> >
> org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:203)
> >         at
> >
> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122)
> >         at
> >
> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
> >         at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
> >         at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
> >         at java.lang.Thread.run(Thread.java:619)
> >
> >
> > The only workaround I found is to remove activemq-data directory at web
> > application site before starting Jetty again.
> >
> > Can you tell me what I'm doing wrong? I have not seen such behavior with
> > ActiveMQ 5.1.
> >
> > Regards,
> > Jarek
> >
> >
> >
>



-- 
http://blog.garytully.com

Open Source SOA
http://FUSESource.com

Re: Embedded broker hangs after restart

Posted by Jarosław Pałka <jp...@gmail.com>.
Sorry I forgot to copy embedded broker configuration:

public abstract class UMMClientConfiguration extends ConfigurationSupport{

    private static final String OUTGOING_LOCAL_TOPIC = "OUTGOING";
    private static final String INCOMING_LOCAL_TOPIC = "INCOMING";

    @Bean
    public BrokerService brokerService() throws Exception {
        XBeanBrokerService broker = new XBeanBrokerService();
        broker.setUseJmx(false);
        broker.setPersistent(true);
        //broker.
        broker.addJmsConnector(incomingTopicConnector());
        broker.addJmsConnector(outgoingTopicConnector());
        broker.addConnector("vm://localhost");
        broker.start();
        return broker;
    }

     @Bean
    public JmsTopicConnector outgoingTopicConnector()
            throws UnknownHostException, URISyntaxException {
        JmsTopicConnector topicConnector = new JmsTopicConnector();
        topicConnector

.setOutboundTopicConnectionFactory(outboundTopicConnectionFactory());
        topicConnector.setLocalClientId("local");
        topicConnector
                .setOutboundTopicBridges(new OutboundTopicBridge[] {
outboundTopicBridge() });
        return topicConnector;
    }

    @Bean
    public TopicConnectionFactory outboundTopicConnectionFactory()
            throws UnknownHostException, URISyntaxException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new
ActiveMQConnectionFactory(
                getAMQBrokerURL());
        activeMQConnectionFactory.setClientID(getClientID());
        return activeMQConnectionFactory;
    }

    @Bean
    public OutboundTopicBridge outboundTopicBridge() throws
URISyntaxException {
        OutboundTopicBridge topicBridge = new OutboundTopicBridge();
        topicBridge.setLocalTopicName(OUTGOING_LOCAL_TOPIC);
        topicBridge.setOutboundTopicName(getOutboundTopicName());
        topicBridge.setConsumerName(getConsumerName());
        return topicBridge;
    }

    @Bean
    public JmsTemplate jmsTemplate() throws UnknownHostException,
            URISyntaxException {
        JmsTemplate jmsTemplate = new JmsTemplate(
                localOutgoingTopicConnectionFactory());
        jmsTemplate.setDefaultDestinationName(OUTGOING_LOCAL_TOPIC);
        jmsTemplate.setDeliveryPersistent(true);
        jmsTemplate.setPubSubDomain(true);
        return jmsTemplate;
    }

      @Bean(dependsOn = "brokerService")
    public TopicConnectionFactory localOutgoingTopicConnectionFactory()
            throws UnknownHostException, URISyntaxException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new
ActiveMQConnectionFactory(
                "vm://localhost");
        activeMQConnectionFactory.setClientID(getLocalOutgoingClientID());
        return activeMQConnectionFactory;
    }
}

Regards,
Jarek

W dniu 26 marca 2009 10:01 użytkownik Jarosław Pałka <jp...@gmail.com>napisał:

> Hi,
>
> I have strange behavior of embedded ActiveMQ 5.2 broker.
>
> I have an web application running under Jetty 1.6.14 (with Spring 2.5.6). I
> have embedded broker that is connected to ActiveMQ server through JMS
> connector.I have to topics INCOMING and OUTGOING that retrieve and forward
> messages to ActiveMQ server.
> I use XBeanBrokerService to configure embedded broker. The configuration of
> embedded broker is in attachment.
>
> Everything runs great until restart of Jetty, once restarted web
> application is able to receive messages but it doesn't send messages. I got
> following exception logged in ActiveMQ server:
>
> ERROR RecoveryListenerAdapter        - Message id
> ID:ubot2-48486-1237589773318-0:0:2291:1:1 could not be recovered from the
> data store - already dispatched
> ERROR Service                        - Async error occurred:
> javax.jms.JMSException: Could not correlate acknowledgment with dispatched
> message: MessageAck {commandId = 8, responseRequired = false, ackType = 0,
> consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, firstMessageId =
> ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
> ID:ubot2-52020-1238055648089-0:0:2:1:1, destination = topic://IN.ltbl,
> transactionId = null, messageCount = 0}
> javax.jms.JMSException: Could not correlate acknowledgment with dispatched
> message: MessageAck {commandId = 8, responseRequired = false, ackType = 0,
> consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, firstMessageId =
> ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
> ID:ubot2-52020-1238055648089-0:0:2:1:1, destination = topic://IN.ltbl,
> transactionId = null, messageCount = 0}
>         at
> org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:304)
>         at
> org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:373)
>         at
> org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462)
>         at
> org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
>         at
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
>         at
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
>         at
> org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
>         at
> org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
>         at
> org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
>         at
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
>         at
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
>         at
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
>         at
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
>         at
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
>         at
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
>         at
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
>         at
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
>         at java.lang.Thread.run(Thread.java:619)
>
> and similar message at web application side during shutdown:
>
> 2009-03-26 09:23:10.975::INFO:  Shutdown hook executing
> 2009-03-26 09:23:10.975::INFO:  Graceful shutdown
> SelectChannelConnector@0.0.0.0:8981
> 140421 [ActiveMQ ShutdownHook] INFO
> org.apache.activemq.broker.BrokerService  - ActiveMQ Message Broker
> (localhost, ID:ubot2-45498-1238055651187-0:0) is shutting down
> 140421 [ActiveMQ ShutdownHook] DEBUG
> org.apache.activemq.broker.BrokerService  - Caught exception, must be
> shutting down: java.lang.IllegalStateException: Shutdown in progress
> 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> org.mortbay.jetty.webapp.WebAppContext@173831b
> {/test-jndi,/home/jpalka/jetty-6.1.14/contexts/test-jndi.d}
> 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> org.mortbay.jetty.webapp.WebAppContext@1abab88
> {/,/home/jpalka/jetty-6.1.14/webapps/test}
> 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> org.mortbay.jetty.handler.ContextHandler@b1b4c3
> {/javadoc,file:/home/jpalka/jetty-6.1.14/javadoc/}
> 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> org.mortbay.jetty.webapp.WebAppContext@5e5f92
> {/umm-gateway,jar:file:/home/jpalka/jetty-6.1.14/webapps/umm-gateway.war!/}
> 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> org.mortbay.jetty.webapp.WebAppContext@1d4ab0e
> {/lottoblog,jar:file:/home/jpalka/jetty-6.1.14/webapps/lottoblog.war!/}
> 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> org.mortbay.jetty.webapp.WebAppContext@12a585c
> {/test-jaas,file:/home/jpalka/jetty-6.1.14/webapps/test-jaas/}
> 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> org.mortbay.jetty.webapp.WebAppContext@10f3801
> {/umm-portal,jar:file:/home/jpalka/jetty-6.1.14/webapps/umm-portal.war!/}
> 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> org.mortbay.jetty.webapp.WebAppContext@2606b8
> {/cometd,jar:file:/home/jpalka/jetty-6.1.14/webapps/cometd.war!/}
> 140444 [ActiveMQ ShutdownHook] INFO
> org.apache.activemq.network.jms.JmsConnector  - JMS Connector Connector:0
> Stopped
> 140447 [ActiveMQ ShutdownHook] INFO
> org.apache.activemq.network.jms.JmsConnector  - JMS Connector Connector:1
> Stopped
> 140449 [ActiveMQ Transport Stopper: vm://localhost#0] DEBUG
> org.apache.activemq.broker.TransportConnection  - Stopping connection:
> vm://localhost#0
> 140452 [VMTransport] DEBUG org.apache.activemq.store.amq.AMQMessageStore  -
> flush starting ...
> 140450 [ActiveMQ Connection Worker: tcp://localhost:61616] DEBUG
> org.apache.activemq.ActiveMQConnection  - Async exception with no exception
> listener: javax.jms.JMSException: Could not correlate acknowledgment with
> dispatched message: MessageAck {commandId = 8, responseRequired = false,
> ackType = 0, consumerId = ID:ubot2-45498-1238055651187-2:0:2:1,
> firstMessageId = ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
> ID:ubot2-52020-1238055648089-0:0:2:1:1, destination = topic://IN.ltbl,
> transactionId = null, messageCount = 0}
> javax.jms.JMSException: Could not correlate acknowledgment with dispatched
> message: MessageAck {commandId = 8, responseRequired = false, ackType = 0,
> consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, firstMessageId =
> ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
> ID:ubot2-52020-1238055648089-0:0:2:1:1, destination = topic://IN.ltbl,
> transactionId = null, messageCount = 0}
>         at
> org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:304)
>         at
> org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:373)
>         at
> org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462)
>         at
> org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
>         at
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
>         at
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
>         at
> org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
>         at
> org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
>         at
> org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
>         at
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
>         at
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
>         at
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
>         at
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
>         at
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
>         at
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
>         at
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
>         at
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
>         at java.lang.Thread.run(Thread.java:619)
> 140465 [VMTransport] DEBUG org.apache.activemq.ActiveMQConnection  - Async
> exception with no exception listener:
> org.apache.activemq.transport.TransportDisposedIOException: Peer
> (vm://localhost#1) disposed.
> org.apache.activemq.transport.TransportDisposedIOException: Peer
> (vm://localhost#1) disposed.
>         at
> org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:203)
>         at
> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122)
>         at
> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>         at java.lang.Thread.run(Thread.java:619)
>
>
> The only workaround I found is to remove activemq-data directory at web
> application site before starting Jetty again.
>
> Can you tell me what I'm doing wrong? I have not seen such behavior with
> ActiveMQ 5.1.
>
> Regards,
> Jarek
>
>
>