You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Bhavesh Mistry <mi...@gmail.com> on 2014/10/20 21:19:05 UTC

[Java New Producer] Snappy NPE Issue

Hi Kafka Dev,

I am getting following issue with Snappy Library.  I checked code for
Snappy lib it seems to be fine.  Have you guys seen this issue ?

2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR
org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
kafka producer I/O thread:
*java.lang.NullPointerException*
at
org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153)
at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317)
at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
at org.apache.kafka.common.record.Compressor.close(Compressor.java:94)
at
org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119)
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
at java.lang.Thread.run(Thread.java:744)


Here is code for Snappy
http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153
:

153 <http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153>
 *if* (inputBuffer
<http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer>
== *null* || (buffer != *null* && buffer.length > inputBuffer
<http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer>.length
<http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer>))
{


Thanks,

Bhavesh

Re: [Java New Producer] Snappy NPE Issue

Posted by Bhavesh Mistry <mi...@gmail.com>.
Hi Ewen,

It seems Leo has fixed the snappy lib for this issue.  Here are changes:
https://github.com/xerial/snappy-java/commit/7b86642f75c280debf3c1983053ea7f8635b48a5


Here is Jar with fix:
https://oss.sonatype.org/content/repositories/snapshots/org/xerial/snappy/snappy-java/1.1.1.4-SNAPSHOT/


I will try this today afternoon.  If it works, would you be able to upgrade
Kafka trunk with this version.

Thanks,

Bhavesh

On Mon, Oct 20, 2014 at 9:53 PM, Bhavesh Mistry <mi...@gmail.com>
wrote:

> Hi Ewen,
>
> Thanks for doing the deep analysis on this issue.  I have file this issue
> with Snappy project and linked this Kafka Issues.  Here is details about
> the git hub issue:  https://github.com/xerial/snappy-java/issues/88
>
> I will follow-up with snappy guys to figure out how to solve this
> problem.  For us, this is typical use case of running web app J2EE
> container  with thread pool and recycled threads.
>
> Thanks,
>
> Bhavesh
>
> On Mon, Oct 20, 2014 at 6:56 PM, Ewen Cheslack-Postava <me...@ewencp.org>
> wrote:
>
>> Also, filed https://issues.apache.org/jira/browse/KAFKA-1721 for this
>> since it either requires an updated version of the upstream library, a
>> workaround by us, or at a bare minimum clear documentation of the issue.
>>
>> On Mon, Oct 20, 2014, at 06:23 PM, Ewen Cheslack-Postava wrote:
>> > I took a quick look at this since I noticed the same issue when testing
>> > your code for the issues you filed. I think the issue is that you're
>> > using all your producers across a thread pool and the snappy library
>> > uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated,
>> > they may be allocated from the same thread (e.g. one of your MyProducer
>> > classes calls Producer.send() on multiple producers from the same
>> > thread) and therefore use the same BufferRecycler. Eventually you hit
>> > the code in the stacktrace, and if two producer send threads hit it
>> > concurrently they improperly share the unsynchronized BufferRecycler.
>> >
>> > This seems like a pain to fix -- it's really a deficiency of the snappy
>> > library and as far as I can see there's no external control over
>> > BufferRecycler in their API. One possibility is to record the thread ID
>> > when we generate a new stream in Compressor and use that to synchronize
>> > access to ensure no concurrent BufferRecycler access. That could be made
>> > specific to snappy so it wouldn't impact other codecs. Not exactly
>> > ideal, but it would work. Unfortunately I can't think of any way for you
>> > to protect against this in your own code since the problem arises in the
>> > producer send thread, which your code should never know about.
>> >
>> > Another option would be to setup your producers differently to avoid the
>> > possibility of unsynchronized access from multiple threads (i.e. don't
>> > use the same thread pool approach), but whether you can do that will
>> > depend on your use case.
>> >
>> > -Ewen
>> >
>> > On Mon, Oct 20, 2014, at 12:19 PM, Bhavesh Mistry wrote:
>> > > Hi Kafka Dev,
>> > >
>> > > I am getting following issue with Snappy Library.  I checked code for
>> > > Snappy lib it seems to be fine.  Have you guys seen this issue ?
>> > >
>> > > 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR
>> > > org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
>> > > kafka producer I/O thread:
>> > > *java.lang.NullPointerException*
>> > > at
>> > >
>> org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153)
>> > > at
>> > >
>> org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317)
>> > > at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
>> > > at org.apache.kafka.common.record.Compressor.close(Compressor.java:94)
>> > > at
>> > >
>> org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119)
>> > > at
>> > >
>> org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285)
>> > > at
>> > >
>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
>> > > at
>> > >
>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
>> > > at java.lang.Thread.run(Thread.java:744)
>> > >
>> > >
>> > > Here is code for Snappy
>> > >
>> http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153
>> > > :
>> > >
>> > > 153
>> > > <
>> http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153
>> >
>> > >  *if* (inputBuffer
>> > > <
>> http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer
>> >
>> > > == *null* || (buffer != *null* && buffer.length > inputBuffer
>> > > <
>> http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer
>> >.length
>> > > <
>> http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer
>> >))
>> > > {
>> > >
>> > >
>> > > Thanks,
>> > >
>> > > Bhavesh
>>
>
>

Re: [Java New Producer] Snappy NPE Issue

Posted by Bhavesh Mistry <mi...@gmail.com>.
Hi Ewen,

Thanks for doing the deep analysis on this issue.  I have file this issue
with Snappy project and linked this Kafka Issues.  Here is details about
the git hub issue:  https://github.com/xerial/snappy-java/issues/88

I will follow-up with snappy guys to figure out how to solve this problem.
For us, this is typical use case of running web app J2EE container  with
thread pool and recycled threads.

Thanks,

Bhavesh

On Mon, Oct 20, 2014 at 6:56 PM, Ewen Cheslack-Postava <me...@ewencp.org>
wrote:

> Also, filed https://issues.apache.org/jira/browse/KAFKA-1721 for this
> since it either requires an updated version of the upstream library, a
> workaround by us, or at a bare minimum clear documentation of the issue.
>
> On Mon, Oct 20, 2014, at 06:23 PM, Ewen Cheslack-Postava wrote:
> > I took a quick look at this since I noticed the same issue when testing
> > your code for the issues you filed. I think the issue is that you're
> > using all your producers across a thread pool and the snappy library
> > uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated,
> > they may be allocated from the same thread (e.g. one of your MyProducer
> > classes calls Producer.send() on multiple producers from the same
> > thread) and therefore use the same BufferRecycler. Eventually you hit
> > the code in the stacktrace, and if two producer send threads hit it
> > concurrently they improperly share the unsynchronized BufferRecycler.
> >
> > This seems like a pain to fix -- it's really a deficiency of the snappy
> > library and as far as I can see there's no external control over
> > BufferRecycler in their API. One possibility is to record the thread ID
> > when we generate a new stream in Compressor and use that to synchronize
> > access to ensure no concurrent BufferRecycler access. That could be made
> > specific to snappy so it wouldn't impact other codecs. Not exactly
> > ideal, but it would work. Unfortunately I can't think of any way for you
> > to protect against this in your own code since the problem arises in the
> > producer send thread, which your code should never know about.
> >
> > Another option would be to setup your producers differently to avoid the
> > possibility of unsynchronized access from multiple threads (i.e. don't
> > use the same thread pool approach), but whether you can do that will
> > depend on your use case.
> >
> > -Ewen
> >
> > On Mon, Oct 20, 2014, at 12:19 PM, Bhavesh Mistry wrote:
> > > Hi Kafka Dev,
> > >
> > > I am getting following issue with Snappy Library.  I checked code for
> > > Snappy lib it seems to be fine.  Have you guys seen this issue ?
> > >
> > > 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR
> > > org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
> > > kafka producer I/O thread:
> > > *java.lang.NullPointerException*
> > > at
> > >
> org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153)
> > > at
> > > org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317)
> > > at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
> > > at org.apache.kafka.common.record.Compressor.close(Compressor.java:94)
> > > at
> > >
> org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119)
> > > at
> > >
> org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285)
> > > at
> > > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
> > > at
> > > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> > > at java.lang.Thread.run(Thread.java:744)
> > >
> > >
> > > Here is code for Snappy
> > >
> http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153
> > > :
> > >
> > > 153
> > > <
> http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153
> >
> > >  *if* (inputBuffer
> > > <
> http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer
> >
> > > == *null* || (buffer != *null* && buffer.length > inputBuffer
> > > <
> http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer
> >.length
> > > <
> http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer
> >))
> > > {
> > >
> > >
> > > Thanks,
> > >
> > > Bhavesh
>

Re: [Java New Producer] Snappy NPE Issue

Posted by Ewen Cheslack-Postava <me...@ewencp.org>.
Also, filed https://issues.apache.org/jira/browse/KAFKA-1721 for this
since it either requires an updated version of the upstream library, a
workaround by us, or at a bare minimum clear documentation of the issue.

On Mon, Oct 20, 2014, at 06:23 PM, Ewen Cheslack-Postava wrote:
> I took a quick look at this since I noticed the same issue when testing
> your code for the issues you filed. I think the issue is that you're
> using all your producers across a thread pool and the snappy library
> uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated,
> they may be allocated from the same thread (e.g. one of your MyProducer
> classes calls Producer.send() on multiple producers from the same
> thread) and therefore use the same BufferRecycler. Eventually you hit
> the code in the stacktrace, and if two producer send threads hit it
> concurrently they improperly share the unsynchronized BufferRecycler.
> 
> This seems like a pain to fix -- it's really a deficiency of the snappy
> library and as far as I can see there's no external control over
> BufferRecycler in their API. One possibility is to record the thread ID
> when we generate a new stream in Compressor and use that to synchronize
> access to ensure no concurrent BufferRecycler access. That could be made
> specific to snappy so it wouldn't impact other codecs. Not exactly
> ideal, but it would work. Unfortunately I can't think of any way for you
> to protect against this in your own code since the problem arises in the
> producer send thread, which your code should never know about.
> 
> Another option would be to setup your producers differently to avoid the
> possibility of unsynchronized access from multiple threads (i.e. don't
> use the same thread pool approach), but whether you can do that will
> depend on your use case.
> 
> -Ewen
> 
> On Mon, Oct 20, 2014, at 12:19 PM, Bhavesh Mistry wrote:
> > Hi Kafka Dev,
> > 
> > I am getting following issue with Snappy Library.  I checked code for
> > Snappy lib it seems to be fine.  Have you guys seen this issue ?
> > 
> > 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR
> > org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
> > kafka producer I/O thread:
> > *java.lang.NullPointerException*
> > at
> > org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153)
> > at
> > org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317)
> > at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
> > at org.apache.kafka.common.record.Compressor.close(Compressor.java:94)
> > at
> > org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119)
> > at
> > org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285)
> > at
> > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
> > at
> > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> > at java.lang.Thread.run(Thread.java:744)
> > 
> > 
> > Here is code for Snappy
> > http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153
> > :
> > 
> > 153
> > <http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153>
> >  *if* (inputBuffer
> > <http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer>
> > == *null* || (buffer != *null* && buffer.length > inputBuffer
> > <http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer>.length
> > <http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer>))
> > {
> > 
> > 
> > Thanks,
> > 
> > Bhavesh

Re: [Java New Producer] Snappy NPE Issue

Posted by Ewen Cheslack-Postava <me...@ewencp.org>.
I took a quick look at this since I noticed the same issue when testing
your code for the issues you filed. I think the issue is that you're
using all your producers across a thread pool and the snappy library
uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated,
they may be allocated from the same thread (e.g. one of your MyProducer
classes calls Producer.send() on multiple producers from the same
thread) and therefore use the same BufferRecycler. Eventually you hit
the code in the stacktrace, and if two producer send threads hit it
concurrently they improperly share the unsynchronized BufferRecycler.

This seems like a pain to fix -- it's really a deficiency of the snappy
library and as far as I can see there's no external control over
BufferRecycler in their API. One possibility is to record the thread ID
when we generate a new stream in Compressor and use that to synchronize
access to ensure no concurrent BufferRecycler access. That could be made
specific to snappy so it wouldn't impact other codecs. Not exactly
ideal, but it would work. Unfortunately I can't think of any way for you
to protect against this in your own code since the problem arises in the
producer send thread, which your code should never know about.

Another option would be to setup your producers differently to avoid the
possibility of unsynchronized access from multiple threads (i.e. don't
use the same thread pool approach), but whether you can do that will
depend on your use case.

-Ewen

On Mon, Oct 20, 2014, at 12:19 PM, Bhavesh Mistry wrote:
> Hi Kafka Dev,
> 
> I am getting following issue with Snappy Library.  I checked code for
> Snappy lib it seems to be fine.  Have you guys seen this issue ?
> 
> 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
> kafka producer I/O thread:
> *java.lang.NullPointerException*
> at
> org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153)
> at
> org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317)
> at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
> at org.apache.kafka.common.record.Compressor.close(Compressor.java:94)
> at
> org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119)
> at
> org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:744)
> 
> 
> Here is code for Snappy
> http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153
> :
> 
> 153
> <http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153>
>  *if* (inputBuffer
> <http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer>
> == *null* || (buffer != *null* && buffer.length > inputBuffer
> <http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer>.length
> <http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer>))
> {
> 
> 
> Thanks,
> 
> Bhavesh