You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by "Rob Godfrey (JIRA)" <qp...@incubator.apache.org> on 2008/07/01 13:21:45 UTC

[jira] Closed: (QPID-1124) [Java Client] Use of thread-unsafe HashMap for destination->consumer causes timeouts

     [ https://issues.apache.org/jira/browse/QPID-1124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Rob Godfrey closed QPID-1124.
-----------------------------


> [Java Client] Use of thread-unsafe HashMap for destination->consumer causes timeouts
> ------------------------------------------------------------------------------------
>
>                 Key: QPID-1124
>                 URL: https://issues.apache.org/jira/browse/QPID-1124
>             Project: Qpid
>          Issue Type: Bug
>          Components: Java Client
>    Affects Versions: M3
>            Reporter: Rob Godfrey
>            Assignee: Rob Godfrey
>             Fix For: M3
>
>
> When testing the trunk client against the C++ broker, I found I occaisionally got 
>  java.lang.RuntimeException: timed out waiting for sync
> 	at org.apache.qpidity.transport.Session.sync(Session.java:334)
> 	at org.apache.qpidity.transport.Session.sync(Session.java:293)
> 	at org.apache.qpidity.nclient.impl.ClientSession.sync(ClientSession.java:107)
> 	at org.apache.qpid.client.AMQSession_0_10.sendConsume(AMQSession_0_10.java:407)
> 	at org.apache.qpid.client.AMQSession.consumeFromQueue(AMQSession.java:2078)
> 	at org.apache.qpid.client.AMQSession.registerConsumer(AMQSession.java:2381)
> 	at org.apache.qpid.client.AMQSession.access$500(AMQSession.java:110)
> 	at org.apache.qpid.client.AMQSession$4.execute(AMQSession.java:1648)
> 	at org.apache.qpid.client.AMQSession$4.execute(AMQSession.java:1620)
> 	at org.apache.qpid.client.failover.FailoverRetrySupport.execute(FailoverRetrySupport.java:119)
> 	at org.apache.qpid.client.AMQSession.createConsumerImpl(AMQSession.java:1618)
> From the following test:
>         AMQConnection con = new AMQConnection("amqp://username:password@clientid/test?brokerlist='tcp://127.0.0.1:5672'");
>         TopicSession session1 = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
>         AMQTopic topic = new AMQTopic(con.getDefaultTopicExchangeName(), "death");
>         TopicPublisher publisher = session1.createPublisher(topic);
>         MessageListener messageListener = new MessageListener()
>         {
>             public void onMessage(Message message)
>             {
>             }
>         };
>         List<TopicSubscriber> listenerList = new ArrayList<TopicSubscriber>();
>         TopicSubscriber listener = session1.createSubscriber(topic);
>         listener.setMessageListener(messageListener);
>         listenerList.add(listener);
>         con.start();
>         for(int i=0; i < 10000 ; i++)
>         {
>             BytesMessage outmsg = session1.createBytesMessage();
>             final byte[] bytes = new byte[1024];
>             outmsg.writeBytes(bytes);
>             publisher.send(outmsg);
>             System.out.println("Sent " + i + "messages") ;
>             listener = session1.createSubscriber(topic);
>             listener.setMessageListener(messageListener);
>             listenerList.add(listener);
>         }
> The underlying cause is an exception in the MINAHandler (which MINA swallows):
> java.lang.NullPointerException
> 	at org.apache.qpidity.nclient.impl.ClientSessionDelegate.messageTransfer(ClientSessionDelegate.java:63)
> 	at org.apache.qpidity.nclient.impl.ClientSessionDelegate.messageTransfer(ClientSessionDelegate.java:20)
> 	at org.apache.qpidity.transport.MessageTransfer.dispatch(MessageTransfer.java:59)
> 	at org.apache.qpidity.transport.SessionDelegate.command(SessionDelegate.java:44)
> 	at org.apache.qpidity.transport.SessionDelegate.command(SessionDelegate.java:32)
> 	at org.apache.qpidity.transport.Method.delegate(Method.java:75)
> 	at org.apache.qpidity.transport.Channel.command(Channel.java:104)
> 	at org.apache.qpidity.transport.Channel.command(Channel.java:43)
> 	at org.apache.qpidity.transport.Method.delegate(Method.java:75)
> 	at org.apache.qpidity.transport.Channel.received(Channel.java:75)
> 	at org.apache.qpidity.transport.Connection.received(Connection.java:82)
> 	at org.apache.qpidity.transport.Connection.received(Connection.java:43)
> 	at org.apache.qpidity.transport.network.Assembler.emit(Assembler.java:92)
> 	at org.apache.qpidity.transport.network.Assembler.emit(Assembler.java:97)
> 	at org.apache.qpidity.transport.network.Assembler.assemble(Assembler.java:160)
> 	at org.apache.qpidity.transport.network.Assembler.frame(Assembler.java:129)
> 	at org.apache.qpidity.transport.network.Frame.delegate(Frame.java:145)
> 	at org.apache.qpidity.transport.network.Assembler.received(Assembler.java:102)
> 	at org.apache.qpidity.transport.network.Assembler.received(Assembler.java:51)
> 	at org.apache.qpidity.transport.network.InputHandler.frame(InputHandler.java:103)
> 	at org.apache.qpidity.transport.network.InputHandler.next(InputHandler.java:204)
> 	at org.apache.qpidity.transport.network.InputHandler.received(InputHandler.java:116)
> 	at org.apache.qpidity.transport.network.InputHandler.received(InputHandler.java:41)
> 	at org.apache.qpidity.transport.network.mina.MinaHandler.messageReceived(MinaHandler.java:84)
> And this is caused by the fact that the _messageListeners Map on ClientSession is not thread safe; but is being access simultaneously by more than one thread
> Fix is to use a ConcurrentHashMap

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.