You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Kai Pruente (JIRA)" <ji...@apache.org> on 2006/08/17 15:08:23 UTC

[jira] Created: (AMQ-882) TopicPublisher.publish(topicSession.createTextMessage("Hello World") hangs and throws a JMSException

TopicPublisher.publish(topicSession.createTextMessage("Hello World") hangs and throws a JMSException
----------------------------------------------------------------------------------------------------

                 Key: AMQ-882
                 URL: https://issues.apache.org/activemq/browse/AMQ-882
             Project: ActiveMQ
          Issue Type: Bug
          Components: Transport
    Affects Versions: 4.0.1
         Environment: Server: Suse Linux 2.6.5-7.244-smp, JDK  1.5.0_07
Client: Windows XP SP2, JDK  1.5.0_06

            Reporter: Kai Pruente


Scenario:
ActiveMQ and the publisher process running on the same server.
Several clients are running on several Windows-XP clients

Publisher code:
{code}
// initializing
connection = msgFactory.createTopicConnection();
connection.setExceptionListener(new JMSExceptionListener());
connection.start();

topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topic = topicSession.createTopic(MFS_LOCATION_CHANGE_EVENT_TOPIC);

publisher = topicSession.createPublisher(topic);
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
[...]
// sending several times with the same topic
try {
  publisher.publish(topicSession.createTextMessage(location.getName()));
} catch (JMSException e) {
  log.fatal("Problems during informing Workplace topic:" + location.getName(), e);
  [...]
{code}

Subscriber code:
{code}
topicConnection = msgFactory.createTopicConnection();
topicConnection.start();
topicConnection.setExceptionListener(new JMSExceptionListener(listeners, this));

topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = topicSession.createTopic(EventSender.MFS_LOCATION_CHANGE_EVENT_TOPIC);

topicSubscriber = topicSession.createSubscriber(topic);
topicSubscriber.setMessageListener(new MessageListener() {
        public void onMessage(Message message) {
          if (message instanceof TextMessage) {
[...]
      topicConnection.start();
{code}

After some thound of messages publish hangs for more than 1 minute and then throws a JMSException (see logs below). After searching in ActiveMQ mailing lists I changed the broker url of the publisher from:
* tcp://arvwms:61616
to 
* tcp://arvwms:61616?soTimeout=2000&connectionTimeout=10000&socketBufferSize=1024&wireFormat.maxInactivityDuration=0

This works some days fine, but now we get the exceptions again.

Is this a problem between topic publisher and ActiveMQ or could it be also a problem between ActiveMQ and topic subscriber? If it is real a problem between publisher and ActiveMQ it can't be a network problem, becuase it is the same server.

This problems occur about 10 times the day. With the exception and losing of 10 messages the day I could live, but the hanging about 1 minute is terrible for our application.

The Exception:
{code}
DEBUG 2006-08-17 12:45:53.738 portConfirmationDefaultHandler :    -:     - handle telegram: com.ssn.acx.extensions.logistics.mfsadapter.telegram.receiving.TransportOrderCompletionTelegram@1385c9f[ID=139887884,loadUnit=01900,lastLocation=SCS_CS,weight=<null>,orientation=0,infoType=COMPLETE,wmsID=1039340,reason=OK]
FATAL 2006-08-17 12:47:12.534 EventSender                    :    -:     - Problems during informing Workplace topic:SCS_CS
javax.jms.JMSException: Broken pipe
  at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:57)
  at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1094)
  at org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1553)
  at org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:462)
  at org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:356)
  at org.apache.activemq.ActiveMQTopicPublisher.publish(ActiveMQTopicPublisher.java:128)
  at com.ssn.acx.extensions.logistics.mfsadapter.event.EventSender.sendLocationChangeEvent(EventSender.java:150)
  at com.ssn.acx.extensions.logistics.mfsadapter.MFSTransactionWithSendingTrigger.commit(MFSTransactionWithSendingTrigger.java:109)
  at com.ssn.acx.core.common.transaction.GlobalTransactionImpl.commit(GlobalTransactionImpl.java:198)
  at com.ssn.acx.core.common.adapterservice.TelegramDispatcher.handleTelegram(TelegramDispatcher.java:306)
  at com.ssn.acx.core.common.adapterservice.TelegramDispatcher.dispatch(TelegramDispatcher.java:180)
  at com.ssn.acx.core.common.adapterservice.AbstractCollector.dispatch(AbstractCollector.java:81)
  at com.ssn.acx.api.common.adapterservice.TriggeredCollector.dispatch(TriggeredCollector.java:87)
  at com.ssn.acx.core.logistics.mfsadapter.MFSCollector.collectTelegrams(MFSCollector.java:122)
  at com.ssn.acx.core.logistics.mfsadapter.WakeUpListener.run(WakeUpListener.java:142)
  at java.lang.Thread.run(Thread.java:595)
Caused by: java.net.SocketException: Broken pipe
  at java.net.SocketOutputStream.socketWrite0(Native Method)
  at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
  at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
  at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:108)
  at java.io.DataOutputStream.flush(DataOutputStream.java:106)
  at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:125)
  at org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:141)
  at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:78)
  at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:77)
  at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:44)
  at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
  at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1092)
  ... 14 more
WARN  2006-08-17 12:47:12.555 EventSender                    :    -:     - Destroy EventSender and cleanup JMS resources failed! Caught: javax.jms.JMSException: Cannot write to the stream any more it has already been closed
javax.jms.JMSException: Cannot write to the stream any more it has already been closed
  at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:57)
  at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1094)
  at org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1655)
  at org.apache.activemq.ActiveMQMessageProducer.close(ActiveMQMessageProducer.java:315)
  at com.ssn.acx.extensions.logistics.mfsadapter.event.EventSender.destroy(EventSender.java:84)
  at com.ssn.acx.extensions.logistics.mfsadapter.event.EventSender.sendLocationChangeEvent(EventSender.java:155)
  at com.ssn.acx.extensions.logistics.mfsadapter.MFSTransactionWithSendingTrigger.commit(MFSTransactionWithSendingTrigger.java:109)
  at com.ssn.acx.core.common.transaction.GlobalTransactionImpl.commit(GlobalTransactionImpl.java:198)
  at com.ssn.acx.core.common.adapterservice.TelegramDispatcher.handleTelegram(TelegramDispatcher.java:306)
  at com.ssn.acx.core.common.adapterservice.TelegramDispatcher.dispatch(TelegramDispatcher.java:180)
  at com.ssn.acx.core.common.adapterservice.AbstractCollector.dispatch(AbstractCollector.java:81)
  at com.ssn.acx.api.common.adapterservice.TriggeredCollector.dispatch(TriggeredCollector.java:87)
  at com.ssn.acx.core.logistics.mfsadapter.MFSCollector.collectTelegrams(MFSCollector.java:122)
  at com.ssn.acx.core.logistics.mfsadapter.WakeUpListener.run(WakeUpListener.java:142)
  at java.lang.Thread.run(Thread.java:595)
Caused by: java.io.EOFException: Cannot write to the stream any more it has already been closed
  at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.checkClosed(TcpBufferedOutputStream.java:131)
  at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.write(TcpBufferedOutputStream.java:69)
  at java.io.DataOutputStream.writeInt(DataOutputStream.java:180)
  at org.apache.activemq.openwire.OpenWireFormat.marshal(OpenWireFormat.java:238)
  at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:124)
  at org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:141)
  at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:78)
  at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:77)
  at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:44)
  at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
  at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1092)
  ... 13 more
INFO  2006-08-17 12:47:12.555 EventSender                    :    -:     - Destroyed event sender{code}


-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: https://issues.apache.org/activemq/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] Resolved: (AMQ-882) TopicPublisher.publish(topicSession.createTextMessage("Hello World") hangs and throws a JMSException

Posted by "james strachan (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/activemq/browse/AMQ-882?page=all ]

james strachan resolved AMQ-882.
--------------------------------

    Fix Version/s: 4.1
                   4.0.2
                   4.0.1
       Resolution: Fixed

This problem looks like the socket is being closed for some reason on your network. Not sure why - could be a dodgy router or some intermittent issue.

The workaround is to enable failover (putting failover: before the tcp:)

http://incubator.apache.org/activemq/how-can-i-support-auto-reconnection.html

> TopicPublisher.publish(topicSession.createTextMessage("Hello World") hangs and throws a JMSException
> ----------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-882
>                 URL: https://issues.apache.org/activemq/browse/AMQ-882
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Transport
>    Affects Versions: 4.0.1
>         Environment: Server: Suse Linux 2.6.5-7.244-smp, JDK  1.5.0_07
> Client: Windows XP SP2, JDK  1.5.0_06
>            Reporter: Kai Pruente
>             Fix For: 4.1, 4.0.2, 4.0.1
>
>
> Scenario:
> ActiveMQ and the publisher process running on the same server.
> Several clients are running on several Windows-XP clients
> Publisher code:
> {code}
> // initializing
> connection = msgFactory.createTopicConnection();
> connection.setExceptionListener(new JMSExceptionListener());
> connection.start();
> topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
> topic = topicSession.createTopic(MFS_LOCATION_CHANGE_EVENT_TOPIC);
> publisher = topicSession.createPublisher(topic);
> publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
> [...]
> // sending several times with the same topic
> try {
>   publisher.publish(topicSession.createTextMessage(location.getName()));
> } catch (JMSException e) {
>   log.fatal("Problems during informing Workplace topic:" + location.getName(), e);
>   [...]
> {code}
> Subscriber code:
> {code}
> topicConnection = msgFactory.createTopicConnection();
> topicConnection.start();
> topicConnection.setExceptionListener(new JMSExceptionListener(listeners, this));
> topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
> Topic topic = topicSession.createTopic(EventSender.MFS_LOCATION_CHANGE_EVENT_TOPIC);
> topicSubscriber = topicSession.createSubscriber(topic);
> topicSubscriber.setMessageListener(new MessageListener() {
>         public void onMessage(Message message) {
>           if (message instanceof TextMessage) {
> [...]
>       topicConnection.start();
> {code}
> After some thound of messages publish hangs for more than 1 minute and then throws a JMSException (see logs below). After searching in ActiveMQ mailing lists I changed the broker url of the publisher from:
> * tcp://arvwms:61616
> to 
> * tcp://arvwms:61616?soTimeout=2000&connectionTimeout=10000&socketBufferSize=1024&wireFormat.maxInactivityDuration=0
> This works some days fine, but now we get the exceptions again.
> Is this a problem between topic publisher and ActiveMQ or could it be also a problem between ActiveMQ and topic subscriber? If it is real a problem between publisher and ActiveMQ it can't be a network problem, becuase it is the same server.
> This problems occur about 10 times the day. With the exception and losing of 10 messages the day I could live, but the hanging about 1 minute is terrible for our application.
> The Exception:
> {code}
> DEBUG 2006-08-17 12:45:53.738 portConfirmationDefaultHandler :    -:     - handle telegram: com.ssn.acx.extensions.logistics.mfsadapter.telegram.receiving.TransportOrderCompletionTelegram@1385c9f[ID=139887884,loadUnit=01900,lastLocation=SCS_CS,weight=<null>,orientation=0,infoType=COMPLETE,wmsID=1039340,reason=OK]
> FATAL 2006-08-17 12:47:12.534 EventSender                    :    -:     - Problems during informing Workplace topic:SCS_CS
> javax.jms.JMSException: Broken pipe
>   at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:57)
>   at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1094)
>   at org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1553)
>   at org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:462)
>   at org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:356)
>   at org.apache.activemq.ActiveMQTopicPublisher.publish(ActiveMQTopicPublisher.java:128)
>   at com.ssn.acx.extensions.logistics.mfsadapter.event.EventSender.sendLocationChangeEvent(EventSender.java:150)
>   at com.ssn.acx.extensions.logistics.mfsadapter.MFSTransactionWithSendingTrigger.commit(MFSTransactionWithSendingTrigger.java:109)
>   at com.ssn.acx.core.common.transaction.GlobalTransactionImpl.commit(GlobalTransactionImpl.java:198)
>   at com.ssn.acx.core.common.adapterservice.TelegramDispatcher.handleTelegram(TelegramDispatcher.java:306)
>   at com.ssn.acx.core.common.adapterservice.TelegramDispatcher.dispatch(TelegramDispatcher.java:180)
>   at com.ssn.acx.core.common.adapterservice.AbstractCollector.dispatch(AbstractCollector.java:81)
>   at com.ssn.acx.api.common.adapterservice.TriggeredCollector.dispatch(TriggeredCollector.java:87)
>   at com.ssn.acx.core.logistics.mfsadapter.MFSCollector.collectTelegrams(MFSCollector.java:122)
>   at com.ssn.acx.core.logistics.mfsadapter.WakeUpListener.run(WakeUpListener.java:142)
>   at java.lang.Thread.run(Thread.java:595)
> Caused by: java.net.SocketException: Broken pipe
>   at java.net.SocketOutputStream.socketWrite0(Native Method)
>   at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
>   at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
>   at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:108)
>   at java.io.DataOutputStream.flush(DataOutputStream.java:106)
>   at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:125)
>   at org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:141)
>   at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:78)
>   at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:77)
>   at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:44)
>   at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
>   at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1092)
>   ... 14 more
> WARN  2006-08-17 12:47:12.555 EventSender                    :    -:     - Destroy EventSender and cleanup JMS resources failed! Caught: javax.jms.JMSException: Cannot write to the stream any more it has already been closed
> javax.jms.JMSException: Cannot write to the stream any more it has already been closed
>   at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:57)
>   at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1094)
>   at org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1655)
>   at org.apache.activemq.ActiveMQMessageProducer.close(ActiveMQMessageProducer.java:315)
>   at com.ssn.acx.extensions.logistics.mfsadapter.event.EventSender.destroy(EventSender.java:84)
>   at com.ssn.acx.extensions.logistics.mfsadapter.event.EventSender.sendLocationChangeEvent(EventSender.java:155)
>   at com.ssn.acx.extensions.logistics.mfsadapter.MFSTransactionWithSendingTrigger.commit(MFSTransactionWithSendingTrigger.java:109)
>   at com.ssn.acx.core.common.transaction.GlobalTransactionImpl.commit(GlobalTransactionImpl.java:198)
>   at com.ssn.acx.core.common.adapterservice.TelegramDispatcher.handleTelegram(TelegramDispatcher.java:306)
>   at com.ssn.acx.core.common.adapterservice.TelegramDispatcher.dispatch(TelegramDispatcher.java:180)
>   at com.ssn.acx.core.common.adapterservice.AbstractCollector.dispatch(AbstractCollector.java:81)
>   at com.ssn.acx.api.common.adapterservice.TriggeredCollector.dispatch(TriggeredCollector.java:87)
>   at com.ssn.acx.core.logistics.mfsadapter.MFSCollector.collectTelegrams(MFSCollector.java:122)
>   at com.ssn.acx.core.logistics.mfsadapter.WakeUpListener.run(WakeUpListener.java:142)
>   at java.lang.Thread.run(Thread.java:595)
> Caused by: java.io.EOFException: Cannot write to the stream any more it has already been closed
>   at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.checkClosed(TcpBufferedOutputStream.java:131)
>   at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.write(TcpBufferedOutputStream.java:69)
>   at java.io.DataOutputStream.writeInt(DataOutputStream.java:180)
>   at org.apache.activemq.openwire.OpenWireFormat.marshal(OpenWireFormat.java:238)
>   at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:124)
>   at org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:141)
>   at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:78)
>   at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:77)
>   at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:44)
>   at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
>   at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1092)
>   ... 13 more
> INFO  2006-08-17 12:47:12.555 EventSender                    :    -:     - Destroyed event sender{code}

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: https://issues.apache.org/activemq/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira