You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by "Keith Wall (JIRA)" <ji...@apache.org> on 2015/10/17 09:46:05 UTC
[jira] [Resolved] (QPID-6798) Broker decompressing a message on
behalf of a client who cannot produces stack trace to Broker log
[ https://issues.apache.org/jira/browse/QPID-6798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Keith Wall resolved QPID-6798.
------------------------------
Resolution: Invalid
This is a client side defect. The sample code sends the same {{Message}} over and over again, but a defect in {{BasicMessageProducer}} means that the the message goes correctly compressed on first send, but if resent, the payload goes uncompressed but with a gzip header.
> Broker decompressing a message on behalf of a client who cannot produces stack trace to Broker log
> --------------------------------------------------------------------------------------------------
>
> Key: QPID-6798
> URL: https://issues.apache.org/jira/browse/QPID-6798
> Project: Qpid
> Issue Type: Bug
> Components: Java Broker
> Reporter: Keith Wall
> Priority: Blocker
> Fix For: qpid-java-6.0
>
>
> Running the following, which has one client connection sending messages that are compressed on the client, and a second client receiving with compression turned off, forcing the broker to decompress the message on the client's behalf, produces the following stack trace in the Broker log.
> I don't yet understand the issue - from the client's perspective the message appears to be received correctly.
> {noformat:java}
> private void runTest()
> {
> try (InputStream resourceAsStream = this.getClass().getResourceAsStream("hello.properties"))
> {
> Properties properties = new Properties();
> properties.load(resourceAsStream);
> Context context = new InitialContext(properties);
> ConnectionFactory senderCF = (ConnectionFactory) context.lookup("senderCF");
> ConnectionFactory receiverCF = (ConnectionFactory) context.lookup("receiverCF");
> Connection sender = senderCF.createConnection(); // enable compression
> Connection receiver = receiverCF.createConnection(); // disable compression
> receiver.start();
> Session senderSession = sender.createSession(false, Session.AUTO_ACKNOWLEDGE);
> Session receiverSession = receiver.createSession(false, Session.AUTO_ACKNOWLEDGE);
> Destination destination = (Destination) context.lookup("queue1");
> MessageProducer messageProducer = senderSession.createProducer(destination);
> MessageConsumer messageConsumer = receiverSession.createConsumer(destination);
> String textToSend = createMessageText();
> TextMessage message = senderSession.createTextMessage(textToSend);
> message.setStringProperty("bar", "foo");
> int count=0;
> while(true)
> {
> messageProducer.send(message);
> Message receivedMessage = messageConsumer.receive(1000);
> TextMessage receivedTextMessage = (TextMessage) receivedMessage;
> if (!((TextMessage) receivedTextMessage).getText().equals(textToSend))
> {
> throw new IllegalStateException("Differing text received");
> }
> System.out.print(".");
> count++;
> if (count % 10 ==0)
> {
> System.out.println();
> }
> }
> }
> catch (Exception exp)
> {
> exp.printStackTrace();
> }
> }
> private String createMessageText()
> {
> StringBuilder stringBuilder = new StringBuilder();
> while(stringBuilder.length() < 2048*1024)
> {
> stringBuilder.append("This should compress easily. ");
> }
> return stringBuilder.toString();
> }
> {noformat}
> {noformat}
> connectionfactory.senderCF = amqp://guest:guest@sender/?brokerlist='tcp://localhost:5672'&compressMessages='true'
> connectionfactory.receiverCF = amqp://guest:guest@receiver/?brokerlist='tcp://localhost:5672'&compressMessages='false'
> queue.queue1 = myqueue
> {noformat}
> {noformat}2015-10-16 17:22:26,183 WARN [IO-/127.0.0.1:53352] (o.a.q.u.GZIPUtils) - Unexpected IOException when attempting to uncompress with gzip
> java.util.zip.ZipException: Not in GZIP format
> at java.util.zip.GZIPInputStream.readHeader(GZIPInputStream.java:164) ~[na:1.7.0_79]
> at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:78) ~[na:1.7.0_79]
> at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:90) ~[na:1.7.0_79]
> at org.apache.qpid.util.GZIPUtils.uncompressStreamToArray(GZIPUtils.java:100) [qpid-common-6.0.0-SNAPSHOT.jar:6.0.0-SNAPSHOT]
> at org.apache.qpid.util.GZIPUtils.uncompressBufferToArray(GZIPUtils.java:87) [qpid-common-6.0.0-SNAPSHOT.jar:6.0.0-SNAPSHOT]
> at org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl.writeMessageDelivery(ProtocolOutputConverterImpl.java:117) [qpid-broker-plugins-amqp-0-8-protocol-6.0.0-SNAPSHOT.jar:6.0.0-SNAPSHOT]
> at org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl.writeMessageDelivery(ProtocolOutputConverterImpl.java:101) [qpid-broker-plugins-amqp-0-8-protocol-6.0.0-SNAPSHOT.jar:6.0.0-SNAPSHOT]
> at org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl.writeDeliver(ProtocolOutputConverterImpl.java:78) [qpid-broker-plugins-amqp-0-8-protocol-6.0.0-SNAPSHOT.jar:6.0.0-SNAPSHOT]
> at org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8$WriteDeliverMethod.deliverToClient(AMQPConnection_0_8.java:1469) [qpid-broker-plugins-amqp-0-8-protocol-6.0.0-SNAPSHOT.jar:6.0.0-SNAPSHOT]
> at org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8.sendToClient(ConsumerTarget_0_8.java:465) [qpid-broker-plugins-amqp-0-8-protocol-6.0.0-SNAPSHOT.jar:6.0.0-SNAPSHOT]
> at org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8$AckConsumer.doSend(ConsumerTarget_0_8.java:275) [qpid-broker-plugins-amqp-0-8-protocol-6.0.0-SNAPSHOT.jar:6.0.0-SNAPSHOT]
> at org.apache.qpid.server.consumer.AbstractConsumerTarget.sendNextMessage(AbstractConsumerTarget.java:211) [qpid-broker-core-6.0.0-SNAPSHOT.jar:6.0.0-SNAPSHOT]
> at org.apache.qpid.server.consumer.AbstractConsumerTarget.processPending(AbstractConsumerTarget.java:63) [qpid-broker-core-6.0.0-SNAPSHOT.jar:6.0.0-SNAPSHOT]
> at org.apache.qpid.server.protocol.v0_8.AMQChannel.processPending(AMQChannel.java:3797) [qpid-broker-plugins-amqp-0-8-protocol-6.0.0-SNAPSHOT.jar:6.0.0-SNAPSHOT]
> at org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8.processPending(AMQPConnection_0_8.java:1598) [qpid-broker-plugins-amqp-0-8-protocol-6.0.0-SNAPSHOT.jar:6.0.0-SNAPSHOT]
> at org.apache.qpid.server.transport.MultiVersionProtocolEngine.processPending(MultiVersionProtocolEngine.java:197) [qpid-broker-core-6.0.0-SNAPSHOT.jar:6.0.0-SNAPSHOT]
> at org.apache.qpid.server.transport.NonBlockingConnection.doWork(NonBlockingConnection.java:223) [qpid-broker-core-6.0.0-SNAPSHOT.jar:6.0.0-SNAPSHOT]
> at org.apache.qpid.server.transport.NetworkConnectionScheduler.processConnection(NetworkConnectionScheduler.java:114) [qpid-broker-core-6.0.0-SNAPSHOT.jar:6.0.0-SNAPSHOT]
> at org.apache.qpid.server.transport.SelectorThread$ConnectionProcessor.run(SelectorThread.java:214) [qpid-broker-core-6.0.0-SNAPSHOT.jar:6.0.0-SNAPSHOT]
> at org.apache.qpid.server.transport.SelectorThread.run(SelectorThread.java:169) [qpid-broker-core-6.0.0-SNAPSHOT.jar:6.0.0-SNAPSHOT]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_79]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_79]
> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org