You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Timothy Bish (JIRA)" <ji...@apache.org> on 2013/12/10 21:34:07 UTC

[jira] [Commented] (AMQ-4920) AmqpErrorException occurs with multiple concurrent amqp topic consumers

    [ https://issues.apache.org/jira/browse/AMQ-4920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13844634#comment-13844634 ] 

Timothy Bish commented on AMQ-4920:
-----------------------------------

My guess would be that the message is getting dispatched to two different transports at the same time and the amqp transport code that modifies headers and read the message body causes one to read the message while its in an invalid state.  

I made this change and now I can't reproduce the problem, still requires further investigation though to confirm that this is what's really happening.  

{code}
                    org.apache.activemq.command.Message message = null;
                    if (md.getMessage() != null) {
                        message = md.getMessage().copy();
                        if (!message.getProperties().containsKey(MESSAGE_FORMAT_KEY)) {
                            message.setProperty(MESSAGE_FORMAT_KEY, 0);
                        }
                    }
                    final ActiveMQMessage jms = (ActiveMQMessage) message;
{code}

> AmqpErrorException occurs with multiple concurrent amqp topic consumers
> -----------------------------------------------------------------------
>
>                 Key: AMQ-4920
>                 URL: https://issues.apache.org/jira/browse/AMQ-4920
>             Project: ActiveMQ
>          Issue Type: Bug
>            Reporter: Kevin Earls
>            Assignee: Kevin Earls
>
> I'll add a test to reproduce this.  There are currently 2 problems.  The more frequent one looks like:  org.apache.qpid.amqp_1_0.type.AmqpErrorException
>         at org.apache.qpid.amqp_1_0.codec.ValueHandler.readConstructor(ValueHandler.java:99)
>         at org.apache.qpid.amqp_1_0.codec.ValueHandler.parse(ValueHandler.java:90)
>         at org.apache.qpid.amqp_1_0.codec.ValueHandler.readConstructor(ValueHandler.java:105)
>         at org.apache.qpid.amqp_1_0.codec.ValueHandler.parse(ValueHandler.java:90)
> … repeated many times
> at org.apache.qpid.amqp_1_0.codec.ValueHandler.readConstructor(ValueHandler.java:105)
>         at org.apache.qpid.amqp_1_0.codec.ValueHandler.parse(ValueHandler.java:90)
>         at org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl.parseAll(SectionDecoderImpl.java:49)
>         at org.apache.qpid.amqp_1_0.client.Receiver.receive(Receiver.java:280)
>         at org.apache.qpid.amqp_1_0.jms.impl.MessageConsumerImpl.receive0(MessageConsumerImpl.java:286)
>         at org.apache.qpid.amqp_1_0.jms.impl.MessageConsumerImpl.receiveImpl(MessageConsumerImpl.java:255)
>         at org.apache.qpid.amqp_1_0.jms.impl.MessageConsumerImpl.receive(MessageConsumerImpl.java:238)
>         at org.apache.qpid.amqp_1_0.jms.impl.MessageConsumerImpl.receive(MessageConsumerImpl.java:56)
>         at org.apache.activemq.transport.amqp.ENTMQ466ConsumerThread.run(ENTMQ466Test.java:123)
> This occurs at the line "final EncodedMessage amqp = outboundTransformer.transform(jms);" in the ConsumerContext.pumpOutbound() method of AmqpProtocolConverter(). This call sometimes returns with its content (amqp.getArray()) set to all zeros. 
> On those messages this line
> LOG.info("In pumpOutbound, setting currentBuffer to offset {} length {} content [{}]", amqp.getArrayOffset(), amqp.getLength(), amqp.getArray());
> returns:
> 2013-11-26 17:19:16,680 [calhost] Task-3] - INFO AmqpProtocolConverter - In pumpOutbound, setting currentBuffer to offset 0 length 162 content [[0, 0, 0, 0, 0, \
> 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,\
> 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0\
> , 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]
> At the root of this, outboundTransformer is a (proton) AutoOutboundTransformer.  It calls AMQPNativeOutboundTransformer.transform(), which calls msg.readBytes(data), which sometimes writes all 0s to data.  Here msg is an ActiveMQBytesMessage.  



--
This message was sent by Atlassian JIRA
(v6.1.4#6159)