You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/11/11 11:49:50 UTC

[GitHub] [pulsar] codelipenghui opened a new issue #5616: [DISCUSS] Consumer messages from un-support compression type

codelipenghui opened a new issue #5616: [DISCUSS] Consumer messages from un-support compression type
URL: https://github.com/apache/pulsar/issues/5616
 
 
   **Is your feature request related to a problem? Please describe.**
   Currently, if use an old version pulsar consumer (e.g. 2.2.0) to read messages from a new version producer(e.g. 2.4.0), if producer enable compression and use a 2.2.0 un-supported compression type, we will get following error logs:
   
   ```
   19:26:55.918 [pulsar-client-io-1-1] WARN  org.apache.pulsar.client.impl.ClientCnx - [localhost/127.0.0.1:6650] Got exception IndexOutOfBoundsException : writerIndex: 682963024 (expected: readerIndex(83) <= writerIndex <= capacity(28476))
   java.lang.IndexOutOfBoundsException: writerIndex: 682963024 (expected: readerIndex(83) <= writerIndex <= capacity(28476))
   	at io.netty.buffer.AbstractByteBuf.writerIndex(AbstractByteBuf.java:118) ~[org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
   	at org.apache.pulsar.common.api.Commands.deSerializeSingleMessageInBatch(Commands.java:1070) ~[org.apache.pulsar-pulsar-common-2.2.0.jar:2.2.0]
   	at org.apache.pulsar.client.impl.ConsumerImpl.receiveIndividualMessagesFromBatch(ConsumerImpl.java:971) ~[org.apache.pulsar-pulsar-client-original-2.2.0.jar:2.2.0]
   	at org.apache.pulsar.client.impl.ConsumerImpl.messageReceived(ConsumerImpl.java:860) ~[org.apache.pulsar-pulsar-client-original-2.2.0.jar:2.2.0]
   	at org.apache.pulsar.client.impl.ClientCnx.handleMessage(ClientCnx.java:287) ~[org.apache.pulsar-pulsar-client-original-2.2.0.jar:2.2.0]
   	at org.apache.pulsar.common.api.PulsarDecoder.channelRead(PulsarDecoder.java:169) ~[org.apache.pulsar-pulsar-common-2.2.0.jar:2.2.0]
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
   	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
   	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:297) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
   	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:413) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
   	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
   	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
   	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1414) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
   	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
   	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:945) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
   	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:806) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
   	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:404) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
   	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:304) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
   	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
   	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
   	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]
   19:26:55.919 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ClientCnx - [id: 0x6515eaab, L:/127.0.0.1:36184 ! R:localhost/127.0.0.1:6650] Disconnected
   19:26:55.919 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://sql/stat/position-feature-recommend-smater-observation-byte] [test] Closed connection [id: 0x6515eaab, L:/127.0.0.1:36184 ! R:localhost/127.0.0.1:6650] -- Will try again in 0.1 s
   ```
   
   It's difficult to troubleshoot the problems from the error log, after doing some compatibility tests then found the problem is producer using ZSTD compression type and consumer using 2.2.0 version which have not support ZSTD compression yet.
   
   **Describe the solution you'd like**
   We need more logs to show that the reason of consumer read failed is un-supported compression type, and then we should close the consumer.
   
   If you have any ideas please left comments here, looking forward to your feedbacks.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services