You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sachin Mittal <sj...@gmail.com> on 2017/03/29 20:07:15 UTC

weird SerializationException when consumer is fetching and parsing record in streams application

Hi,
This is for first time we are getting a weird exception.
After this the streams caches.

Only work around is to manually seek and commit offset to a greater number
and we are needing this manual intervention again and again.

Any idea what is causing it and how can we circumvent this.

Note this error happens in both cases when 10.2 client or 10.1.1 client
connect to kafka server 10.1.1

So this does not looks like version issue.

Also we have following setting
message.max.bytes=5000013
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576"
ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576"

Rest is all default and also increasing the value for
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not help.

Stack trace below.

Thanks
Sachin


org.apache.kafka.common.errors.SerializationException: Error deserializing
key/value for partition advice-stream-6 at offset 45153795
java.lang.IllegalArgumentException: null
  at java.nio.Buffer.limit(Buffer.java:275) ~[na:1.8.0_122-ea]
  at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
~[kafka-clients-0.10.2.0.jar:na]
  at org.apache.kafka.common.record.Record.value(Record.java:268)
~[kafka-clients-0.10.2.0.jar:na]
  at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:867)
~[kafka-clients-0.10.2.0.jar:na]
  at org.apache.kafka.clients.consumer.internals.Fetcher.
parseCompletedFetch(Fetcher.java:775) ~[kafka-clients-0.10.2.0.jar:na]
  at org.apache.kafka.clients.consumer.internals.Fetcher.
fetchedRecords(Fetcher.java:473) ~[kafka-clients-0.10.2.0.jar:na]
  at org.apache.kafka.clients.consumer.KafkaConsumer.
pollOnce(KafkaConsumer.java:1062) ~[kafka-clients-0.10.2.0.jar:na]
  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
~[kafka-clients-0.10.2.0.jar:na]
  at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
  at org.apache.kafka.streams.processor.internals.
StreamThread.run(StreamThread.java:378) ~[kafka-streams-0.10.2.1-
SNAPSHOT.jar:na]

Re: weird SerializationException when consumer is fetching and parsing record in streams application

Posted by Damian Guy <da...@gmail.com>.
Not that i'm aware of

On Thu, 30 Mar 2017 at 16:00 Sachin Mittal <sj...@gmail.com> wrote:

> Damian,
> Is there any way where I can just dump out the contents at a given offset
> from a given log segment file.
>
> I am not sure how DumpLogSegment
> <
> https://cwiki.apache.org/confluence/display/KAFKA/System+Tools#SystemTools-DumpLogSegment
> >
> helps. I already know the log segment file where that message is. Thing is
> size is 1 GB and there is no easy way to inspect that file and actually see
> what the payload is.
>
> Thanks
> Sachin
>
>
> On Thu, Mar 30, 2017 at 7:18 PM, Damian Guy <da...@gmail.com> wrote:
>
> > Sachin,
> >
> > Not sure if this will help, but you might want to try running
> > https://cwiki.apache.org/confluence/display/KAFKA/
> > System+Tools#SystemTools-DumpLogSegment
> > on the partition that is causing you problems.
> >
> > Thanks
> > Damian
> >
> > On Thu, 30 Mar 2017 at 14:29 Michael Noll <mi...@confluent.io> wrote:
> >
> > > Sachin,
> > >
> > > there's a JIRA that seems related to what you're seeing:
> > > https://issues.apache.org/jira/browse/KAFKA-4740
> > >
> > > Perhaps you could check the above and report back?
> > >
> > > -Michael
> > >
> > >
> > >
> > >
> > > On Thu, Mar 30, 2017 at 3:23 PM, Michael Noll <mi...@confluent.io>
> > > wrote:
> > >
> > > > Hmm, I re-read the stacktrace again. It does look like the value-side
> > > > being the culprit (as Sachin suggested earlier).
> > > >
> > > > -Michael
> > > >
> > > >
> > > > On Thu, Mar 30, 2017 at 3:18 PM, Michael Noll <mi...@confluent.io>
> > > > wrote:
> > > >
> > > >> Sachin,
> > > >>
> > > >> you have this line:
> > > >>
> > > >> > builder.stream(Serdes.String(), serde, "advice-stream")
> > > >>
> > > >> Could the problem be that not the record values are causing the
> > problem
> > > >> -- because your value deserializer does try-catch any such errors --
> > but
> > > >> that the record *keys* are malformed?  The built-in
> `Serdes.String()`
> > > does
> > > >> not try-catch deserialization errors, and from a quick look at the
> > > source
> > > >> it seems that the `Fetcher` class (clients/src/main/java/org/apa
> > > >> che/kafka/clients/consumer/internals/Fetcher.java) is throwing your
> > > >> error above ("Error deserializing key/value for partition..."), and
> > the
> > > >> Fetcher is swallowing the more specific SerializationException of
> > > >> `String.Serdes()` (but it will include the original
> > exception/Throwable
> > > in
> > > >> its own SerializationException).
> > > >>
> > > >> -Michael
> > > >>
> > > >>
> > > >>
> > > >> On Thu, Mar 30, 2017 at 2:52 PM, Sachin Mittal <sj...@gmail.com>
> > > >> wrote:
> > > >>
> > > >>> My streams application does run in debug mode only.
> > > >>> Also I have checked the code around these lines
> > > >>>
> > > >>>   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.
> > java:791)
> > > >>> ~[kafka-clients-0.10.2.0.jar:na]
> > > >>>   at org.apache.kafka.common.record.Record.value(Record.java:268)
> > > >>> ~[kafka-clients-0.10.2.0.jar:na]
> > > >>>   at org.apache.kafka.clients.consumer.internals.Fetcher.parseRec
> > > >>> ord(Fetcher.java:867)
> > > >>> ~[kafka-clients-0.10.2.0.jar:na]
> > > >>>
> > > >>> I don't see any log statement which will give me more information.
> > > >>>
> > > >>> https://github.com/apache/kafka/blob/0.10.2/clients/src/main
> > > >>> /java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867
> > > >>>
> > > >>> The issue is happening at this line and perhaps handling the
> > exception
> > > >>> and
> > > >>> setting the value to be null may be better options.
> > > >>> Yes at client side nothing can be done because exception is
> happening
> > > >>> before this.valueDeserializer.deserialize can be called.
> > > >>>
> > > >>> Thanks
> > > >>> Sachin
> > > >>>
> > > >>>
> > > >>> On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy <da...@gmail.com>
> > > >>> wrote:
> > > >>>
> > > >>> > The suggestions in that FAQ won't help as it is too late, i.e.,
> the
> > > >>> message
> > > >>> > has already been received into Streams.
> > > >>> > You could create a simple app that uses the Consumer, seeks to
> the
> > > >>> offset,
> > > >>> > and tries to read the message. If you did this in debug mode you
> > > might
> > > >>> find
> > > >>> > out some more information.
> > > >>> >
> > > >>> >
> > > >>> > On Thu, 30 Mar 2017 at 11:50 Sachin Mittal <sj...@gmail.com>
> > > wrote:
> > > >>> >
> > > >>> > > Well I try to read that offset via kafka-console-consumer.sh
> too
> > > and
> > > >>> it
> > > >>> > > fails with same error.
> > > >>> > >
> > > >>> > > So was wondering if I can apply any of the suggestion as per
> > > >>> > >
> > > >>> > > http://docs.confluent.io/3.2.0/streams/faq.html#handling-
> > > >>> > corrupted-records-and-deserialization-errors-poison-pill-messages
> > > >>> > >
> > > >>> > > If there is any other was just to get the contents of that
> > message
> > > it
> > > >>> > would
> > > >>> > > be helpful.
> > > >>> > >
> > > >>> > > Thanks
> > > >>> > > Sachin
> > > >>> > >
> > > >>> > >
> > > >>> > > On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy <
> > damian.guy@gmail.com>
> > > >>> > wrote:
> > > >>> > >
> > > >>> > > > Hi Sachin,
> > > >>> > > >
> > > >>> > > > Have you tried firing up a consumer (non-streams), seeking to
> > > that
> > > >>> > offset
> > > >>> > > > on the topic and seeing what the message is? Might be easier
> to
> > > >>> debug?
> > > >>> > > Like
> > > >>> > > > you say, it is failing in the consumer.
> > > >>> > > > Thanks,
> > > >>> > > > Damian
> > > >>> > > >
> > > >>> > > > On Thu, 30 Mar 2017 at 10:35 Sachin Mittal <
> sjmittal@gmail.com
> > >
> > > >>> wrote:
> > > >>> > > >
> > > >>> > > > > I think I am following the third option.
> > > >>> > > > >
> > > >>> > > > > My pipeline is:
> > > >>> > > > >
> > > >>> > > > > serde= Serdes.serdeFrom(new VSerializer(), new
> VDeserializer
> > > ());
> > > >>> > > > >
> > > >>> > > > > builder.stream(Serdes.String(), serde, "advice-stream")
> > > >>> > > > >   .filter(new Predicate<String, V>() { ...})
> > > >>> > > > >   .groupByKey()
> > > >>> > > > >   .aggregate(new Initializer<V1>() {...}, new
> > > Aggregator<String,
> > > >>> V,
> > > >>> > > V1>()
> > > >>> > > > > {...}, windows, supplier)
> > > >>> > > > >   .mapValues(new ValueMapper<V1, V2>() { ... })
> > > >>> > > > >   .foreach(new ForeachAction<Windowed<String>, V2>() {...
> > });
> > > >>> > > > >
> > > >>> > > > >
> > > >>> > > > > and In VDeserializer (implements Deserializer<V>) I am
> doing
> > > >>> > something
> > > >>> > > > like
> > > >>> > > > > this:
> > > >>> > > > >
> > > >>> > > > >     public V deserialize(String paramString, byte[]
> > > >>> > paramArrayOfByte) {
> > > >>> > > > >         if (paramArrayOfByte == null) { return null;}
> > > >>> > > > >         V data = null;
> > > >>> > > > >         try {
> > > >>> > > > >             data = objectMapper.readValue(paramArrayOfByte,
> > new
> > > >>> > > > > TypeReference<V>() {});
> > > >>> > > > >         } catch (Exception e) {
> > > >>> > > > >             e.printStackTrace();
> > > >>> > > > >         }
> > > >>> > > > >         return data;
> > > >>> > > > >     }
> > > >>> > > > >
> > > >>> > > > > So I am catching any exception that may happen when
> > > >>> deserializing the
> > > >>> > > > data.
> > > >>> > > > >
> > > >>> > > > > This is what third option suggest (if I am not mistaken).
> > > >>> > > > >
> > > >>> > > > > Please let me know given the pipeline we which option would
> > be
> > > >>> best
> > > >>> > and
> > > >>> > > > how
> > > >>> > > > > can we incorporate that in our pipeline.
> > > >>> > > > >
> > > >>> > > > > Also not exception is happening when reading from source
> > topic
> > > >>> which
> > > >>> > is
> > > >>> > > > > "advice-stream", so looks like flow is not going to
> pipeline
> > at
> > > >>> all
> > > >>> > for
> > > >>> > > > us
> > > >>> > > > > to handle. It is terminating right at consumer poll.
> > > >>> > > > >
> > > >>> > > > > Thanks
> > > >>> > > > > Sachin
> > > >>> > > > >
> > > >>> > > > >
> > > >>> > > > > On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll <
> > > >>> michael@confluent.io>
> > > >>> > > > > wrote:
> > > >>> > > > >
> > > >>> > > > > > Could this be a corrupted message ("poison pill") in your
> > > >>> topic?
> > > >>> > > > > >
> > > >>> > > > > > If so, take a look at
> > > >>> > > > > > http://docs.confluent.io/current/streams/faq.html#
> > > >>> > > > > >
> > > >>> > > > > handling-corrupted-records-and-deserialization-errors-
> > > >>> > > > poison-pill-messages
> > > >>> > > > > >
> > > >>> > > > > > FYI: We're currently investigating a more elegant way to
> > > >>> address
> > > >>> > such
> > > >>> > > > > > poison pill problems.  If you have feedback on that
> front,
> > > feel
> > > >>> > free
> > > >>> > > to
> > > >>> > > > > > share it with us. :-)
> > > >>> > > > > >
> > > >>> > > > > > -Michael
> > > >>> > > > > >
> > > >>> > > > > >
> > > >>> > > > > >
> > > >>> > > > > >
> > > >>> > > > > > On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal <
> > > >>> > sjmittal@gmail.com>
> > > >>> > > > > > wrote:
> > > >>> > > > > >
> > > >>> > > > > > > Hi,
> > > >>> > > > > > > This is for first time we are getting a weird
> exception.
> > > >>> > > > > > > After this the streams caches.
> > > >>> > > > > > >
> > > >>> > > > > > > Only work around is to manually seek and commit offset
> > to a
> > > >>> > greater
> > > >>> > > > > > number
> > > >>> > > > > > > and we are needing this manual intervention again and
> > > again.
> > > >>> > > > > > >
> > > >>> > > > > > > Any idea what is causing it and how can we circumvent
> > this.
> > > >>> > > > > > >
> > > >>> > > > > > > Note this error happens in both cases when 10.2 client
> or
> > > >>> 10.1.1
> > > >>> > > > client
> > > >>> > > > > > > connect to kafka server 10.1.1
> > > >>> > > > > > >
> > > >>> > > > > > > So this does not looks like version issue.
> > > >>> > > > > > >
> > > >>> > > > > > > Also we have following setting
> > > >>> > > > > > > message.max.bytes=5000013
> > > >>> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
> > "5048576"
> > > >>> > > > > > > ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576"
> > > >>> > > > > > >
> > > >>> > > > > > > Rest is all default and also increasing the value for
> > > >>> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not
> > > >>> help.
> > > >>> > > > > > >
> > > >>> > > > > > > Stack trace below.
> > > >>> > > > > > >
> > > >>> > > > > > > Thanks
> > > >>> > > > > > > Sachin
> > > >>> > > > > > >
> > > >>> > > > > > >
> > > >>> > > > > > > org.apache.kafka.common.errors.SerializationException:
> > > Error
> > > >>> > > > > > deserializing
> > > >>> > > > > > > key/value for partition advice-stream-6 at offset
> > 45153795
> > > >>> > > > > > > java.lang.IllegalArgumentException: null
> > > >>> > > > > > >   at java.nio.Buffer.limit(Buffer.java:275)
> > > >>> ~[na:1.8.0_122-ea]
> > > >>> > > > > > >   at org.apache.kafka.common.utils.
> > > >>> Utils.sizeDelimited(Utils.
> > > >>> > > > java:791)
> > > >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > >>> > > > > > >   at
> org.apache.kafka.common.record.Record.value(Record.
> > > >>> > java:268)
> > > >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > >>> > > > > > >   at org.apache.kafka.clients.
> > consumer.internals.Fetcher.
> > > >>> > > > > > > parseRecord(Fetcher.java:867)
> > > >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > >>> > > > > > >   at org.apache.kafka.clients.
> > consumer.internals.Fetcher.
> > > >>> > > > > > > parseCompletedFetch(Fetcher.java:775)
> > > >>> > ~[kafka-clients-0.10.2.0.jar:
> > > >>> > > > na]
> > > >>> > > > > > >   at org.apache.kafka.clients.
> > consumer.internals.Fetcher.
> > > >>> > > > > > > fetchedRecords(Fetcher.java:473)
> > > >>> ~[kafka-clients-0.10.2.0.jar:
> > > >>> > na]
> > > >>> > > > > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.
> > > >>> > > > > > > pollOnce(KafkaConsumer.java:1062)
> > > >>> ~[kafka-clients-0.10.2.0.jar:
> > > >>> > na]
> > > >>> > > > > > >   at org.apache.kafka.clients.
> > consumer.KafkaConsumer.poll(
> > > >>> > > > > > > KafkaConsumer.java:995)
> > > >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > >>> > > > > > >   at org.apache.kafka.streams.processor.internals.
> > > >>> > > > StreamThread.runLoop(
> > > >>> > > > > > > StreamThread.java:592)
> > > >>> > > > > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > > >>> > > > > > >   at org.apache.kafka.streams.processor.internals.
> > > >>> > > > > > > StreamThread.run(StreamThread.java:378)
> > > >>> > ~[kafka-streams-0.10.2.1-
> > > >>> > > > > > > SNAPSHOT.jar:na]
> > > >>> > > > > > >
> > > >>> > > > > >
> > > >>> > > > >
> > > >>> > > >
> > > >>> > >
> > > >>> >
> > > >>>
> > > >>
> > > >>
> > > >>
> > > >
> > > >
> > >
> >
>

Re: weird SerializationException when consumer is fetching and parsing record in streams application

Posted by Sachin Mittal <sj...@gmail.com>.
Damian,
Is there any way where I can just dump out the contents at a given offset
from a given log segment file.

I am not sure how DumpLogSegment
<https://cwiki.apache.org/confluence/display/KAFKA/System+Tools#SystemTools-DumpLogSegment>
helps. I already know the log segment file where that message is. Thing is
size is 1 GB and there is no easy way to inspect that file and actually see
what the payload is.

Thanks
Sachin


On Thu, Mar 30, 2017 at 7:18 PM, Damian Guy <da...@gmail.com> wrote:

> Sachin,
>
> Not sure if this will help, but you might want to try running
> https://cwiki.apache.org/confluence/display/KAFKA/
> System+Tools#SystemTools-DumpLogSegment
> on the partition that is causing you problems.
>
> Thanks
> Damian
>
> On Thu, 30 Mar 2017 at 14:29 Michael Noll <mi...@confluent.io> wrote:
>
> > Sachin,
> >
> > there's a JIRA that seems related to what you're seeing:
> > https://issues.apache.org/jira/browse/KAFKA-4740
> >
> > Perhaps you could check the above and report back?
> >
> > -Michael
> >
> >
> >
> >
> > On Thu, Mar 30, 2017 at 3:23 PM, Michael Noll <mi...@confluent.io>
> > wrote:
> >
> > > Hmm, I re-read the stacktrace again. It does look like the value-side
> > > being the culprit (as Sachin suggested earlier).
> > >
> > > -Michael
> > >
> > >
> > > On Thu, Mar 30, 2017 at 3:18 PM, Michael Noll <mi...@confluent.io>
> > > wrote:
> > >
> > >> Sachin,
> > >>
> > >> you have this line:
> > >>
> > >> > builder.stream(Serdes.String(), serde, "advice-stream")
> > >>
> > >> Could the problem be that not the record values are causing the
> problem
> > >> -- because your value deserializer does try-catch any such errors --
> but
> > >> that the record *keys* are malformed?  The built-in `Serdes.String()`
> > does
> > >> not try-catch deserialization errors, and from a quick look at the
> > source
> > >> it seems that the `Fetcher` class (clients/src/main/java/org/apa
> > >> che/kafka/clients/consumer/internals/Fetcher.java) is throwing your
> > >> error above ("Error deserializing key/value for partition..."), and
> the
> > >> Fetcher is swallowing the more specific SerializationException of
> > >> `String.Serdes()` (but it will include the original
> exception/Throwable
> > in
> > >> its own SerializationException).
> > >>
> > >> -Michael
> > >>
> > >>
> > >>
> > >> On Thu, Mar 30, 2017 at 2:52 PM, Sachin Mittal <sj...@gmail.com>
> > >> wrote:
> > >>
> > >>> My streams application does run in debug mode only.
> > >>> Also I have checked the code around these lines
> > >>>
> > >>>   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.
> java:791)
> > >>> ~[kafka-clients-0.10.2.0.jar:na]
> > >>>   at org.apache.kafka.common.record.Record.value(Record.java:268)
> > >>> ~[kafka-clients-0.10.2.0.jar:na]
> > >>>   at org.apache.kafka.clients.consumer.internals.Fetcher.parseRec
> > >>> ord(Fetcher.java:867)
> > >>> ~[kafka-clients-0.10.2.0.jar:na]
> > >>>
> > >>> I don't see any log statement which will give me more information.
> > >>>
> > >>> https://github.com/apache/kafka/blob/0.10.2/clients/src/main
> > >>> /java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867
> > >>>
> > >>> The issue is happening at this line and perhaps handling the
> exception
> > >>> and
> > >>> setting the value to be null may be better options.
> > >>> Yes at client side nothing can be done because exception is happening
> > >>> before this.valueDeserializer.deserialize can be called.
> > >>>
> > >>> Thanks
> > >>> Sachin
> > >>>
> > >>>
> > >>> On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy <da...@gmail.com>
> > >>> wrote:
> > >>>
> > >>> > The suggestions in that FAQ won't help as it is too late, i.e., the
> > >>> message
> > >>> > has already been received into Streams.
> > >>> > You could create a simple app that uses the Consumer, seeks to the
> > >>> offset,
> > >>> > and tries to read the message. If you did this in debug mode you
> > might
> > >>> find
> > >>> > out some more information.
> > >>> >
> > >>> >
> > >>> > On Thu, 30 Mar 2017 at 11:50 Sachin Mittal <sj...@gmail.com>
> > wrote:
> > >>> >
> > >>> > > Well I try to read that offset via kafka-console-consumer.sh too
> > and
> > >>> it
> > >>> > > fails with same error.
> > >>> > >
> > >>> > > So was wondering if I can apply any of the suggestion as per
> > >>> > >
> > >>> > > http://docs.confluent.io/3.2.0/streams/faq.html#handling-
> > >>> > corrupted-records-and-deserialization-errors-poison-pill-messages
> > >>> > >
> > >>> > > If there is any other was just to get the contents of that
> message
> > it
> > >>> > would
> > >>> > > be helpful.
> > >>> > >
> > >>> > > Thanks
> > >>> > > Sachin
> > >>> > >
> > >>> > >
> > >>> > > On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy <
> damian.guy@gmail.com>
> > >>> > wrote:
> > >>> > >
> > >>> > > > Hi Sachin,
> > >>> > > >
> > >>> > > > Have you tried firing up a consumer (non-streams), seeking to
> > that
> > >>> > offset
> > >>> > > > on the topic and seeing what the message is? Might be easier to
> > >>> debug?
> > >>> > > Like
> > >>> > > > you say, it is failing in the consumer.
> > >>> > > > Thanks,
> > >>> > > > Damian
> > >>> > > >
> > >>> > > > On Thu, 30 Mar 2017 at 10:35 Sachin Mittal <sjmittal@gmail.com
> >
> > >>> wrote:
> > >>> > > >
> > >>> > > > > I think I am following the third option.
> > >>> > > > >
> > >>> > > > > My pipeline is:
> > >>> > > > >
> > >>> > > > > serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer
> > ());
> > >>> > > > >
> > >>> > > > > builder.stream(Serdes.String(), serde, "advice-stream")
> > >>> > > > >   .filter(new Predicate<String, V>() { ...})
> > >>> > > > >   .groupByKey()
> > >>> > > > >   .aggregate(new Initializer<V1>() {...}, new
> > Aggregator<String,
> > >>> V,
> > >>> > > V1>()
> > >>> > > > > {...}, windows, supplier)
> > >>> > > > >   .mapValues(new ValueMapper<V1, V2>() { ... })
> > >>> > > > >   .foreach(new ForeachAction<Windowed<String>, V2>() {...
> });
> > >>> > > > >
> > >>> > > > >
> > >>> > > > > and In VDeserializer (implements Deserializer<V>) I am doing
> > >>> > something
> > >>> > > > like
> > >>> > > > > this:
> > >>> > > > >
> > >>> > > > >     public V deserialize(String paramString, byte[]
> > >>> > paramArrayOfByte) {
> > >>> > > > >         if (paramArrayOfByte == null) { return null;}
> > >>> > > > >         V data = null;
> > >>> > > > >         try {
> > >>> > > > >             data = objectMapper.readValue(paramArrayOfByte,
> new
> > >>> > > > > TypeReference<V>() {});
> > >>> > > > >         } catch (Exception e) {
> > >>> > > > >             e.printStackTrace();
> > >>> > > > >         }
> > >>> > > > >         return data;
> > >>> > > > >     }
> > >>> > > > >
> > >>> > > > > So I am catching any exception that may happen when
> > >>> deserializing the
> > >>> > > > data.
> > >>> > > > >
> > >>> > > > > This is what third option suggest (if I am not mistaken).
> > >>> > > > >
> > >>> > > > > Please let me know given the pipeline we which option would
> be
> > >>> best
> > >>> > and
> > >>> > > > how
> > >>> > > > > can we incorporate that in our pipeline.
> > >>> > > > >
> > >>> > > > > Also not exception is happening when reading from source
> topic
> > >>> which
> > >>> > is
> > >>> > > > > "advice-stream", so looks like flow is not going to pipeline
> at
> > >>> all
> > >>> > for
> > >>> > > > us
> > >>> > > > > to handle. It is terminating right at consumer poll.
> > >>> > > > >
> > >>> > > > > Thanks
> > >>> > > > > Sachin
> > >>> > > > >
> > >>> > > > >
> > >>> > > > > On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll <
> > >>> michael@confluent.io>
> > >>> > > > > wrote:
> > >>> > > > >
> > >>> > > > > > Could this be a corrupted message ("poison pill") in your
> > >>> topic?
> > >>> > > > > >
> > >>> > > > > > If so, take a look at
> > >>> > > > > > http://docs.confluent.io/current/streams/faq.html#
> > >>> > > > > >
> > >>> > > > > handling-corrupted-records-and-deserialization-errors-
> > >>> > > > poison-pill-messages
> > >>> > > > > >
> > >>> > > > > > FYI: We're currently investigating a more elegant way to
> > >>> address
> > >>> > such
> > >>> > > > > > poison pill problems.  If you have feedback on that front,
> > feel
> > >>> > free
> > >>> > > to
> > >>> > > > > > share it with us. :-)
> > >>> > > > > >
> > >>> > > > > > -Michael
> > >>> > > > > >
> > >>> > > > > >
> > >>> > > > > >
> > >>> > > > > >
> > >>> > > > > > On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal <
> > >>> > sjmittal@gmail.com>
> > >>> > > > > > wrote:
> > >>> > > > > >
> > >>> > > > > > > Hi,
> > >>> > > > > > > This is for first time we are getting a weird exception.
> > >>> > > > > > > After this the streams caches.
> > >>> > > > > > >
> > >>> > > > > > > Only work around is to manually seek and commit offset
> to a
> > >>> > greater
> > >>> > > > > > number
> > >>> > > > > > > and we are needing this manual intervention again and
> > again.
> > >>> > > > > > >
> > >>> > > > > > > Any idea what is causing it and how can we circumvent
> this.
> > >>> > > > > > >
> > >>> > > > > > > Note this error happens in both cases when 10.2 client or
> > >>> 10.1.1
> > >>> > > > client
> > >>> > > > > > > connect to kafka server 10.1.1
> > >>> > > > > > >
> > >>> > > > > > > So this does not looks like version issue.
> > >>> > > > > > >
> > >>> > > > > > > Also we have following setting
> > >>> > > > > > > message.max.bytes=5000013
> > >>> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
> "5048576"
> > >>> > > > > > > ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576"
> > >>> > > > > > >
> > >>> > > > > > > Rest is all default and also increasing the value for
> > >>> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not
> > >>> help.
> > >>> > > > > > >
> > >>> > > > > > > Stack trace below.
> > >>> > > > > > >
> > >>> > > > > > > Thanks
> > >>> > > > > > > Sachin
> > >>> > > > > > >
> > >>> > > > > > >
> > >>> > > > > > > org.apache.kafka.common.errors.SerializationException:
> > Error
> > >>> > > > > > deserializing
> > >>> > > > > > > key/value for partition advice-stream-6 at offset
> 45153795
> > >>> > > > > > > java.lang.IllegalArgumentException: null
> > >>> > > > > > >   at java.nio.Buffer.limit(Buffer.java:275)
> > >>> ~[na:1.8.0_122-ea]
> > >>> > > > > > >   at org.apache.kafka.common.utils.
> > >>> Utils.sizeDelimited(Utils.
> > >>> > > > java:791)
> > >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > >>> > > > > > >   at org.apache.kafka.common.record.Record.value(Record.
> > >>> > java:268)
> > >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > >>> > > > > > >   at org.apache.kafka.clients.
> consumer.internals.Fetcher.
> > >>> > > > > > > parseRecord(Fetcher.java:867)
> > >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > >>> > > > > > >   at org.apache.kafka.clients.
> consumer.internals.Fetcher.
> > >>> > > > > > > parseCompletedFetch(Fetcher.java:775)
> > >>> > ~[kafka-clients-0.10.2.0.jar:
> > >>> > > > na]
> > >>> > > > > > >   at org.apache.kafka.clients.
> consumer.internals.Fetcher.
> > >>> > > > > > > fetchedRecords(Fetcher.java:473)
> > >>> ~[kafka-clients-0.10.2.0.jar:
> > >>> > na]
> > >>> > > > > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.
> > >>> > > > > > > pollOnce(KafkaConsumer.java:1062)
> > >>> ~[kafka-clients-0.10.2.0.jar:
> > >>> > na]
> > >>> > > > > > >   at org.apache.kafka.clients.
> consumer.KafkaConsumer.poll(
> > >>> > > > > > > KafkaConsumer.java:995)
> > >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > >>> > > > > > >   at org.apache.kafka.streams.processor.internals.
> > >>> > > > StreamThread.runLoop(
> > >>> > > > > > > StreamThread.java:592)
> > >>> > > > > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > >>> > > > > > >   at org.apache.kafka.streams.processor.internals.
> > >>> > > > > > > StreamThread.run(StreamThread.java:378)
> > >>> > ~[kafka-streams-0.10.2.1-
> > >>> > > > > > > SNAPSHOT.jar:na]
> > >>> > > > > > >
> > >>> > > > > >
> > >>> > > > >
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> > >>
> > >>
> > >>
> > >
> > >
> >
>

Re: weird SerializationException when consumer is fetching and parsing record in streams application

Posted by Damian Guy <da...@gmail.com>.
Sachin,

Not sure if this will help, but you might want to try running
https://cwiki.apache.org/confluence/display/KAFKA/System+Tools#SystemTools-DumpLogSegment
on the partition that is causing you problems.

Thanks
Damian

On Thu, 30 Mar 2017 at 14:29 Michael Noll <mi...@confluent.io> wrote:

> Sachin,
>
> there's a JIRA that seems related to what you're seeing:
> https://issues.apache.org/jira/browse/KAFKA-4740
>
> Perhaps you could check the above and report back?
>
> -Michael
>
>
>
>
> On Thu, Mar 30, 2017 at 3:23 PM, Michael Noll <mi...@confluent.io>
> wrote:
>
> > Hmm, I re-read the stacktrace again. It does look like the value-side
> > being the culprit (as Sachin suggested earlier).
> >
> > -Michael
> >
> >
> > On Thu, Mar 30, 2017 at 3:18 PM, Michael Noll <mi...@confluent.io>
> > wrote:
> >
> >> Sachin,
> >>
> >> you have this line:
> >>
> >> > builder.stream(Serdes.String(), serde, "advice-stream")
> >>
> >> Could the problem be that not the record values are causing the problem
> >> -- because your value deserializer does try-catch any such errors -- but
> >> that the record *keys* are malformed?  The built-in `Serdes.String()`
> does
> >> not try-catch deserialization errors, and from a quick look at the
> source
> >> it seems that the `Fetcher` class (clients/src/main/java/org/apa
> >> che/kafka/clients/consumer/internals/Fetcher.java) is throwing your
> >> error above ("Error deserializing key/value for partition..."), and the
> >> Fetcher is swallowing the more specific SerializationException of
> >> `String.Serdes()` (but it will include the original exception/Throwable
> in
> >> its own SerializationException).
> >>
> >> -Michael
> >>
> >>
> >>
> >> On Thu, Mar 30, 2017 at 2:52 PM, Sachin Mittal <sj...@gmail.com>
> >> wrote:
> >>
> >>> My streams application does run in debug mode only.
> >>> Also I have checked the code around these lines
> >>>
> >>>   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
> >>> ~[kafka-clients-0.10.2.0.jar:na]
> >>>   at org.apache.kafka.common.record.Record.value(Record.java:268)
> >>> ~[kafka-clients-0.10.2.0.jar:na]
> >>>   at org.apache.kafka.clients.consumer.internals.Fetcher.parseRec
> >>> ord(Fetcher.java:867)
> >>> ~[kafka-clients-0.10.2.0.jar:na]
> >>>
> >>> I don't see any log statement which will give me more information.
> >>>
> >>> https://github.com/apache/kafka/blob/0.10.2/clients/src/main
> >>> /java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867
> >>>
> >>> The issue is happening at this line and perhaps handling the exception
> >>> and
> >>> setting the value to be null may be better options.
> >>> Yes at client side nothing can be done because exception is happening
> >>> before this.valueDeserializer.deserialize can be called.
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>>
> >>> On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy <da...@gmail.com>
> >>> wrote:
> >>>
> >>> > The suggestions in that FAQ won't help as it is too late, i.e., the
> >>> message
> >>> > has already been received into Streams.
> >>> > You could create a simple app that uses the Consumer, seeks to the
> >>> offset,
> >>> > and tries to read the message. If you did this in debug mode you
> might
> >>> find
> >>> > out some more information.
> >>> >
> >>> >
> >>> > On Thu, 30 Mar 2017 at 11:50 Sachin Mittal <sj...@gmail.com>
> wrote:
> >>> >
> >>> > > Well I try to read that offset via kafka-console-consumer.sh too
> and
> >>> it
> >>> > > fails with same error.
> >>> > >
> >>> > > So was wondering if I can apply any of the suggestion as per
> >>> > >
> >>> > > http://docs.confluent.io/3.2.0/streams/faq.html#handling-
> >>> > corrupted-records-and-deserialization-errors-poison-pill-messages
> >>> > >
> >>> > > If there is any other was just to get the contents of that message
> it
> >>> > would
> >>> > > be helpful.
> >>> > >
> >>> > > Thanks
> >>> > > Sachin
> >>> > >
> >>> > >
> >>> > > On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy <da...@gmail.com>
> >>> > wrote:
> >>> > >
> >>> > > > Hi Sachin,
> >>> > > >
> >>> > > > Have you tried firing up a consumer (non-streams), seeking to
> that
> >>> > offset
> >>> > > > on the topic and seeing what the message is? Might be easier to
> >>> debug?
> >>> > > Like
> >>> > > > you say, it is failing in the consumer.
> >>> > > > Thanks,
> >>> > > > Damian
> >>> > > >
> >>> > > > On Thu, 30 Mar 2017 at 10:35 Sachin Mittal <sj...@gmail.com>
> >>> wrote:
> >>> > > >
> >>> > > > > I think I am following the third option.
> >>> > > > >
> >>> > > > > My pipeline is:
> >>> > > > >
> >>> > > > > serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer
> ());
> >>> > > > >
> >>> > > > > builder.stream(Serdes.String(), serde, "advice-stream")
> >>> > > > >   .filter(new Predicate<String, V>() { ...})
> >>> > > > >   .groupByKey()
> >>> > > > >   .aggregate(new Initializer<V1>() {...}, new
> Aggregator<String,
> >>> V,
> >>> > > V1>()
> >>> > > > > {...}, windows, supplier)
> >>> > > > >   .mapValues(new ValueMapper<V1, V2>() { ... })
> >>> > > > >   .foreach(new ForeachAction<Windowed<String>, V2>() {... });
> >>> > > > >
> >>> > > > >
> >>> > > > > and In VDeserializer (implements Deserializer<V>) I am doing
> >>> > something
> >>> > > > like
> >>> > > > > this:
> >>> > > > >
> >>> > > > >     public V deserialize(String paramString, byte[]
> >>> > paramArrayOfByte) {
> >>> > > > >         if (paramArrayOfByte == null) { return null;}
> >>> > > > >         V data = null;
> >>> > > > >         try {
> >>> > > > >             data = objectMapper.readValue(paramArrayOfByte, new
> >>> > > > > TypeReference<V>() {});
> >>> > > > >         } catch (Exception e) {
> >>> > > > >             e.printStackTrace();
> >>> > > > >         }
> >>> > > > >         return data;
> >>> > > > >     }
> >>> > > > >
> >>> > > > > So I am catching any exception that may happen when
> >>> deserializing the
> >>> > > > data.
> >>> > > > >
> >>> > > > > This is what third option suggest (if I am not mistaken).
> >>> > > > >
> >>> > > > > Please let me know given the pipeline we which option would be
> >>> best
> >>> > and
> >>> > > > how
> >>> > > > > can we incorporate that in our pipeline.
> >>> > > > >
> >>> > > > > Also not exception is happening when reading from source topic
> >>> which
> >>> > is
> >>> > > > > "advice-stream", so looks like flow is not going to pipeline at
> >>> all
> >>> > for
> >>> > > > us
> >>> > > > > to handle. It is terminating right at consumer poll.
> >>> > > > >
> >>> > > > > Thanks
> >>> > > > > Sachin
> >>> > > > >
> >>> > > > >
> >>> > > > > On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll <
> >>> michael@confluent.io>
> >>> > > > > wrote:
> >>> > > > >
> >>> > > > > > Could this be a corrupted message ("poison pill") in your
> >>> topic?
> >>> > > > > >
> >>> > > > > > If so, take a look at
> >>> > > > > > http://docs.confluent.io/current/streams/faq.html#
> >>> > > > > >
> >>> > > > > handling-corrupted-records-and-deserialization-errors-
> >>> > > > poison-pill-messages
> >>> > > > > >
> >>> > > > > > FYI: We're currently investigating a more elegant way to
> >>> address
> >>> > such
> >>> > > > > > poison pill problems.  If you have feedback on that front,
> feel
> >>> > free
> >>> > > to
> >>> > > > > > share it with us. :-)
> >>> > > > > >
> >>> > > > > > -Michael
> >>> > > > > >
> >>> > > > > >
> >>> > > > > >
> >>> > > > > >
> >>> > > > > > On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal <
> >>> > sjmittal@gmail.com>
> >>> > > > > > wrote:
> >>> > > > > >
> >>> > > > > > > Hi,
> >>> > > > > > > This is for first time we are getting a weird exception.
> >>> > > > > > > After this the streams caches.
> >>> > > > > > >
> >>> > > > > > > Only work around is to manually seek and commit offset to a
> >>> > greater
> >>> > > > > > number
> >>> > > > > > > and we are needing this manual intervention again and
> again.
> >>> > > > > > >
> >>> > > > > > > Any idea what is causing it and how can we circumvent this.
> >>> > > > > > >
> >>> > > > > > > Note this error happens in both cases when 10.2 client or
> >>> 10.1.1
> >>> > > > client
> >>> > > > > > > connect to kafka server 10.1.1
> >>> > > > > > >
> >>> > > > > > > So this does not looks like version issue.
> >>> > > > > > >
> >>> > > > > > > Also we have following setting
> >>> > > > > > > message.max.bytes=5000013
> >>> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576"
> >>> > > > > > > ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576"
> >>> > > > > > >
> >>> > > > > > > Rest is all default and also increasing the value for
> >>> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not
> >>> help.
> >>> > > > > > >
> >>> > > > > > > Stack trace below.
> >>> > > > > > >
> >>> > > > > > > Thanks
> >>> > > > > > > Sachin
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > org.apache.kafka.common.errors.SerializationException:
> Error
> >>> > > > > > deserializing
> >>> > > > > > > key/value for partition advice-stream-6 at offset 45153795
> >>> > > > > > > java.lang.IllegalArgumentException: null
> >>> > > > > > >   at java.nio.Buffer.limit(Buffer.java:275)
> >>> ~[na:1.8.0_122-ea]
> >>> > > > > > >   at org.apache.kafka.common.utils.
> >>> Utils.sizeDelimited(Utils.
> >>> > > > java:791)
> >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> >>> > > > > > >   at org.apache.kafka.common.record.Record.value(Record.
> >>> > java:268)
> >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> >>> > > > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> >>> > > > > > > parseRecord(Fetcher.java:867)
> >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> >>> > > > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> >>> > > > > > > parseCompletedFetch(Fetcher.java:775)
> >>> > ~[kafka-clients-0.10.2.0.jar:
> >>> > > > na]
> >>> > > > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> >>> > > > > > > fetchedRecords(Fetcher.java:473)
> >>> ~[kafka-clients-0.10.2.0.jar:
> >>> > na]
> >>> > > > > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.
> >>> > > > > > > pollOnce(KafkaConsumer.java:1062)
> >>> ~[kafka-clients-0.10.2.0.jar:
> >>> > na]
> >>> > > > > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> >>> > > > > > > KafkaConsumer.java:995)
> >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> >>> > > > > > >   at org.apache.kafka.streams.processor.internals.
> >>> > > > StreamThread.runLoop(
> >>> > > > > > > StreamThread.java:592)
> >>> > > > > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>> > > > > > >   at org.apache.kafka.streams.processor.internals.
> >>> > > > > > > StreamThread.run(StreamThread.java:378)
> >>> > ~[kafka-streams-0.10.2.1-
> >>> > > > > > > SNAPSHOT.jar:na]
> >>> > > > > > >
> >>> > > > > >
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> >>
> >>
> >>
> >
> >
>

Re: weird SerializationException when consumer is fetching and parsing record in streams application

Posted by Guozhang Wang <wa...@gmail.com>.
Sachin,

We are discussing how to work around KAFKA-4740 for poison pill records:

https://issues.apache.org/jira/browse/KAFKA-5157

And Please share your scenario and your opinions on the solution there.


Guozhang

On Tue, May 16, 2017 at 9:50 PM, Sachin Mittal <sj...@gmail.com> wrote:

> Folks is there any updated on
> https://issues.apache.org/jira/browse/KAFKA-4740.
> This is now so far only pressing issue for our streams application.
>
> It happens from time to time and so far our only solution is to increase
> the offset of the group for that partition beyond the offset that caused
> this issue.
> Other thing I have noticed is that such (poison pill) messages then to
> bunch together. So usually we need to increase the offset by say 10000 to
> get past the issue.
> So basically we loose processing of these many messages. If we increase the
> offset by just one then we again see another such offset few offsets higher
> and so on.
>
> So in order to prevent multiple manual restart of the streams application
> we simply increase the offset by 10000. Then streams application works
> uninterrupted for few months and one fine day we again see such messages.
>
> As the bug is critical shouldn't we get some fix or workaround in next
> release.
>
> Thanks
> Sachin
>
>
>
>
>
> On Thu, Mar 30, 2017 at 6:59 PM, Michael Noll <mi...@confluent.io>
> wrote:
>
> > Sachin,
> >
> > there's a JIRA that seems related to what you're seeing:
> > https://issues.apache.org/jira/browse/KAFKA-4740
> >
> > Perhaps you could check the above and report back?
> >
> > -Michael
> >
> >
> >
> >
> > On Thu, Mar 30, 2017 at 3:23 PM, Michael Noll <mi...@confluent.io>
> > wrote:
> >
> > > Hmm, I re-read the stacktrace again. It does look like the value-side
> > > being the culprit (as Sachin suggested earlier).
> > >
> > > -Michael
> > >
> > >
> > > On Thu, Mar 30, 2017 at 3:18 PM, Michael Noll <mi...@confluent.io>
> > > wrote:
> > >
> > >> Sachin,
> > >>
> > >> you have this line:
> > >>
> > >> > builder.stream(Serdes.String(), serde, "advice-stream")
> > >>
> > >> Could the problem be that not the record values are causing the
> problem
> > >> -- because your value deserializer does try-catch any such errors --
> but
> > >> that the record *keys* are malformed?  The built-in `Serdes.String()`
> > does
> > >> not try-catch deserialization errors, and from a quick look at the
> > source
> > >> it seems that the `Fetcher` class (clients/src/main/java/org/apa
> > >> che/kafka/clients/consumer/internals/Fetcher.java) is throwing your
> > >> error above ("Error deserializing key/value for partition..."), and
> the
> > >> Fetcher is swallowing the more specific SerializationException of
> > >> `String.Serdes()` (but it will include the original
> exception/Throwable
> > in
> > >> its own SerializationException).
> > >>
> > >> -Michael
> > >>
> > >>
> > >>
> > >> On Thu, Mar 30, 2017 at 2:52 PM, Sachin Mittal <sj...@gmail.com>
> > >> wrote:
> > >>
> > >>> My streams application does run in debug mode only.
> > >>> Also I have checked the code around these lines
> > >>>
> > >>>   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.
> java:791)
> > >>> ~[kafka-clients-0.10.2.0.jar:na]
> > >>>   at org.apache.kafka.common.record.Record.value(Record.java:268)
> > >>> ~[kafka-clients-0.10.2.0.jar:na]
> > >>>   at org.apache.kafka.clients.consumer.internals.Fetcher.parseRec
> > >>> ord(Fetcher.java:867)
> > >>> ~[kafka-clients-0.10.2.0.jar:na]
> > >>>
> > >>> I don't see any log statement which will give me more information.
> > >>>
> > >>> https://github.com/apache/kafka/blob/0.10.2/clients/src/main
> > >>> /java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867
> > >>>
> > >>> The issue is happening at this line and perhaps handling the
> exception
> > >>> and
> > >>> setting the value to be null may be better options.
> > >>> Yes at client side nothing can be done because exception is happening
> > >>> before this.valueDeserializer.deserialize can be called.
> > >>>
> > >>> Thanks
> > >>> Sachin
> > >>>
> > >>>
> > >>> On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy <da...@gmail.com>
> > >>> wrote:
> > >>>
> > >>> > The suggestions in that FAQ won't help as it is too late, i.e., the
> > >>> message
> > >>> > has already been received into Streams.
> > >>> > You could create a simple app that uses the Consumer, seeks to the
> > >>> offset,
> > >>> > and tries to read the message. If you did this in debug mode you
> > might
> > >>> find
> > >>> > out some more information.
> > >>> >
> > >>> >
> > >>> > On Thu, 30 Mar 2017 at 11:50 Sachin Mittal <sj...@gmail.com>
> > wrote:
> > >>> >
> > >>> > > Well I try to read that offset via kafka-console-consumer.sh too
> > and
> > >>> it
> > >>> > > fails with same error.
> > >>> > >
> > >>> > > So was wondering if I can apply any of the suggestion as per
> > >>> > >
> > >>> > > http://docs.confluent.io/3.2.0/streams/faq.html#handling-
> > >>> > corrupted-records-and-deserialization-errors-poison-pill-messages
> > >>> > >
> > >>> > > If there is any other was just to get the contents of that
> message
> > it
> > >>> > would
> > >>> > > be helpful.
> > >>> > >
> > >>> > > Thanks
> > >>> > > Sachin
> > >>> > >
> > >>> > >
> > >>> > > On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy <
> damian.guy@gmail.com>
> > >>> > wrote:
> > >>> > >
> > >>> > > > Hi Sachin,
> > >>> > > >
> > >>> > > > Have you tried firing up a consumer (non-streams), seeking to
> > that
> > >>> > offset
> > >>> > > > on the topic and seeing what the message is? Might be easier to
> > >>> debug?
> > >>> > > Like
> > >>> > > > you say, it is failing in the consumer.
> > >>> > > > Thanks,
> > >>> > > > Damian
> > >>> > > >
> > >>> > > > On Thu, 30 Mar 2017 at 10:35 Sachin Mittal <sjmittal@gmail.com
> >
> > >>> wrote:
> > >>> > > >
> > >>> > > > > I think I am following the third option.
> > >>> > > > >
> > >>> > > > > My pipeline is:
> > >>> > > > >
> > >>> > > > > serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer
> > ());
> > >>> > > > >
> > >>> > > > > builder.stream(Serdes.String(), serde, "advice-stream")
> > >>> > > > >   .filter(new Predicate<String, V>() { ...})
> > >>> > > > >   .groupByKey()
> > >>> > > > >   .aggregate(new Initializer<V1>() {...}, new
> > Aggregator<String,
> > >>> V,
> > >>> > > V1>()
> > >>> > > > > {...}, windows, supplier)
> > >>> > > > >   .mapValues(new ValueMapper<V1, V2>() { ... })
> > >>> > > > >   .foreach(new ForeachAction<Windowed<String>, V2>() {...
> });
> > >>> > > > >
> > >>> > > > >
> > >>> > > > > and In VDeserializer (implements Deserializer<V>) I am doing
> > >>> > something
> > >>> > > > like
> > >>> > > > > this:
> > >>> > > > >
> > >>> > > > >     public V deserialize(String paramString, byte[]
> > >>> > paramArrayOfByte) {
> > >>> > > > >         if (paramArrayOfByte == null) { return null;}
> > >>> > > > >         V data = null;
> > >>> > > > >         try {
> > >>> > > > >             data = objectMapper.readValue(paramArrayOfByte,
> > new
> > >>> > > > > TypeReference<V>() {});
> > >>> > > > >         } catch (Exception e) {
> > >>> > > > >             e.printStackTrace();
> > >>> > > > >         }
> > >>> > > > >         return data;
> > >>> > > > >     }
> > >>> > > > >
> > >>> > > > > So I am catching any exception that may happen when
> > >>> deserializing the
> > >>> > > > data.
> > >>> > > > >
> > >>> > > > > This is what third option suggest (if I am not mistaken).
> > >>> > > > >
> > >>> > > > > Please let me know given the pipeline we which option would
> be
> > >>> best
> > >>> > and
> > >>> > > > how
> > >>> > > > > can we incorporate that in our pipeline.
> > >>> > > > >
> > >>> > > > > Also not exception is happening when reading from source
> topic
> > >>> which
> > >>> > is
> > >>> > > > > "advice-stream", so looks like flow is not going to pipeline
> at
> > >>> all
> > >>> > for
> > >>> > > > us
> > >>> > > > > to handle. It is terminating right at consumer poll.
> > >>> > > > >
> > >>> > > > > Thanks
> > >>> > > > > Sachin
> > >>> > > > >
> > >>> > > > >
> > >>> > > > > On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll <
> > >>> michael@confluent.io>
> > >>> > > > > wrote:
> > >>> > > > >
> > >>> > > > > > Could this be a corrupted message ("poison pill") in your
> > >>> topic?
> > >>> > > > > >
> > >>> > > > > > If so, take a look at
> > >>> > > > > > http://docs.confluent.io/current/streams/faq.html#
> > >>> > > > > >
> > >>> > > > > handling-corrupted-records-and-deserialization-errors-
> > >>> > > > poison-pill-messages
> > >>> > > > > >
> > >>> > > > > > FYI: We're currently investigating a more elegant way to
> > >>> address
> > >>> > such
> > >>> > > > > > poison pill problems.  If you have feedback on that front,
> > feel
> > >>> > free
> > >>> > > to
> > >>> > > > > > share it with us. :-)
> > >>> > > > > >
> > >>> > > > > > -Michael
> > >>> > > > > >
> > >>> > > > > >
> > >>> > > > > >
> > >>> > > > > >
> > >>> > > > > > On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal <
> > >>> > sjmittal@gmail.com>
> > >>> > > > > > wrote:
> > >>> > > > > >
> > >>> > > > > > > Hi,
> > >>> > > > > > > This is for first time we are getting a weird exception.
> > >>> > > > > > > After this the streams caches.
> > >>> > > > > > >
> > >>> > > > > > > Only work around is to manually seek and commit offset
> to a
> > >>> > greater
> > >>> > > > > > number
> > >>> > > > > > > and we are needing this manual intervention again and
> > again.
> > >>> > > > > > >
> > >>> > > > > > > Any idea what is causing it and how can we circumvent
> this.
> > >>> > > > > > >
> > >>> > > > > > > Note this error happens in both cases when 10.2 client or
> > >>> 10.1.1
> > >>> > > > client
> > >>> > > > > > > connect to kafka server 10.1.1
> > >>> > > > > > >
> > >>> > > > > > > So this does not looks like version issue.
> > >>> > > > > > >
> > >>> > > > > > > Also we have following setting
> > >>> > > > > > > message.max.bytes=5000013
> > >>> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
> "5048576"
> > >>> > > > > > > ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576"
> > >>> > > > > > >
> > >>> > > > > > > Rest is all default and also increasing the value for
> > >>> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not
> > >>> help.
> > >>> > > > > > >
> > >>> > > > > > > Stack trace below.
> > >>> > > > > > >
> > >>> > > > > > > Thanks
> > >>> > > > > > > Sachin
> > >>> > > > > > >
> > >>> > > > > > >
> > >>> > > > > > > org.apache.kafka.common.errors.SerializationException:
> > Error
> > >>> > > > > > deserializing
> > >>> > > > > > > key/value for partition advice-stream-6 at offset
> 45153795
> > >>> > > > > > > java.lang.IllegalArgumentException: null
> > >>> > > > > > >   at java.nio.Buffer.limit(Buffer.java:275)
> > >>> ~[na:1.8.0_122-ea]
> > >>> > > > > > >   at org.apache.kafka.common.utils.
> > >>> Utils.sizeDelimited(Utils.
> > >>> > > > java:791)
> > >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > >>> > > > > > >   at org.apache.kafka.common.record.Record.value(Record.
> > >>> > java:268)
> > >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > >>> > > > > > >   at org.apache.kafka.clients.
> consumer.internals.Fetcher.
> > >>> > > > > > > parseRecord(Fetcher.java:867)
> > >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > >>> > > > > > >   at org.apache.kafka.clients.
> consumer.internals.Fetcher.
> > >>> > > > > > > parseCompletedFetch(Fetcher.java:775)
> > >>> > ~[kafka-clients-0.10.2.0.jar:
> > >>> > > > na]
> > >>> > > > > > >   at org.apache.kafka.clients.
> consumer.internals.Fetcher.
> > >>> > > > > > > fetchedRecords(Fetcher.java:473)
> > >>> ~[kafka-clients-0.10.2.0.jar:
> > >>> > na]
> > >>> > > > > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.
> > >>> > > > > > > pollOnce(KafkaConsumer.java:1062)
> > >>> ~[kafka-clients-0.10.2.0.jar:
> > >>> > na]
> > >>> > > > > > >   at org.apache.kafka.clients.
> consumer.KafkaConsumer.poll(
> > >>> > > > > > > KafkaConsumer.java:995)
> > >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > >>> > > > > > >   at org.apache.kafka.streams.processor.internals.
> > >>> > > > StreamThread.runLoop(
> > >>> > > > > > > StreamThread.java:592)
> > >>> > > > > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > >>> > > > > > >   at org.apache.kafka.streams.processor.internals.
> > >>> > > > > > > StreamThread.run(StreamThread.java:378)
> > >>> > ~[kafka-streams-0.10.2.1-
> > >>> > > > > > > SNAPSHOT.jar:na]
> > >>> > > > > > >
> > >>> > > > > >
> > >>> > > > >
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> > >>
> > >>
> > >>
> > >
> > >
> >
>



-- 
-- Guozhang

Re: weird SerializationException when consumer is fetching and parsing record in streams application

Posted by Sachin Mittal <sj...@gmail.com>.
Folks is there any updated on
https://issues.apache.org/jira/browse/KAFKA-4740.
This is now so far only pressing issue for our streams application.

It happens from time to time and so far our only solution is to increase
the offset of the group for that partition beyond the offset that caused
this issue.
Other thing I have noticed is that such (poison pill) messages then to
bunch together. So usually we need to increase the offset by say 10000 to
get past the issue.
So basically we loose processing of these many messages. If we increase the
offset by just one then we again see another such offset few offsets higher
and so on.

So in order to prevent multiple manual restart of the streams application
we simply increase the offset by 10000. Then streams application works
uninterrupted for few months and one fine day we again see such messages.

As the bug is critical shouldn't we get some fix or workaround in next
release.

Thanks
Sachin





On Thu, Mar 30, 2017 at 6:59 PM, Michael Noll <mi...@confluent.io> wrote:

> Sachin,
>
> there's a JIRA that seems related to what you're seeing:
> https://issues.apache.org/jira/browse/KAFKA-4740
>
> Perhaps you could check the above and report back?
>
> -Michael
>
>
>
>
> On Thu, Mar 30, 2017 at 3:23 PM, Michael Noll <mi...@confluent.io>
> wrote:
>
> > Hmm, I re-read the stacktrace again. It does look like the value-side
> > being the culprit (as Sachin suggested earlier).
> >
> > -Michael
> >
> >
> > On Thu, Mar 30, 2017 at 3:18 PM, Michael Noll <mi...@confluent.io>
> > wrote:
> >
> >> Sachin,
> >>
> >> you have this line:
> >>
> >> > builder.stream(Serdes.String(), serde, "advice-stream")
> >>
> >> Could the problem be that not the record values are causing the problem
> >> -- because your value deserializer does try-catch any such errors -- but
> >> that the record *keys* are malformed?  The built-in `Serdes.String()`
> does
> >> not try-catch deserialization errors, and from a quick look at the
> source
> >> it seems that the `Fetcher` class (clients/src/main/java/org/apa
> >> che/kafka/clients/consumer/internals/Fetcher.java) is throwing your
> >> error above ("Error deserializing key/value for partition..."), and the
> >> Fetcher is swallowing the more specific SerializationException of
> >> `String.Serdes()` (but it will include the original exception/Throwable
> in
> >> its own SerializationException).
> >>
> >> -Michael
> >>
> >>
> >>
> >> On Thu, Mar 30, 2017 at 2:52 PM, Sachin Mittal <sj...@gmail.com>
> >> wrote:
> >>
> >>> My streams application does run in debug mode only.
> >>> Also I have checked the code around these lines
> >>>
> >>>   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
> >>> ~[kafka-clients-0.10.2.0.jar:na]
> >>>   at org.apache.kafka.common.record.Record.value(Record.java:268)
> >>> ~[kafka-clients-0.10.2.0.jar:na]
> >>>   at org.apache.kafka.clients.consumer.internals.Fetcher.parseRec
> >>> ord(Fetcher.java:867)
> >>> ~[kafka-clients-0.10.2.0.jar:na]
> >>>
> >>> I don't see any log statement which will give me more information.
> >>>
> >>> https://github.com/apache/kafka/blob/0.10.2/clients/src/main
> >>> /java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867
> >>>
> >>> The issue is happening at this line and perhaps handling the exception
> >>> and
> >>> setting the value to be null may be better options.
> >>> Yes at client side nothing can be done because exception is happening
> >>> before this.valueDeserializer.deserialize can be called.
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>>
> >>> On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy <da...@gmail.com>
> >>> wrote:
> >>>
> >>> > The suggestions in that FAQ won't help as it is too late, i.e., the
> >>> message
> >>> > has already been received into Streams.
> >>> > You could create a simple app that uses the Consumer, seeks to the
> >>> offset,
> >>> > and tries to read the message. If you did this in debug mode you
> might
> >>> find
> >>> > out some more information.
> >>> >
> >>> >
> >>> > On Thu, 30 Mar 2017 at 11:50 Sachin Mittal <sj...@gmail.com>
> wrote:
> >>> >
> >>> > > Well I try to read that offset via kafka-console-consumer.sh too
> and
> >>> it
> >>> > > fails with same error.
> >>> > >
> >>> > > So was wondering if I can apply any of the suggestion as per
> >>> > >
> >>> > > http://docs.confluent.io/3.2.0/streams/faq.html#handling-
> >>> > corrupted-records-and-deserialization-errors-poison-pill-messages
> >>> > >
> >>> > > If there is any other was just to get the contents of that message
> it
> >>> > would
> >>> > > be helpful.
> >>> > >
> >>> > > Thanks
> >>> > > Sachin
> >>> > >
> >>> > >
> >>> > > On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy <da...@gmail.com>
> >>> > wrote:
> >>> > >
> >>> > > > Hi Sachin,
> >>> > > >
> >>> > > > Have you tried firing up a consumer (non-streams), seeking to
> that
> >>> > offset
> >>> > > > on the topic and seeing what the message is? Might be easier to
> >>> debug?
> >>> > > Like
> >>> > > > you say, it is failing in the consumer.
> >>> > > > Thanks,
> >>> > > > Damian
> >>> > > >
> >>> > > > On Thu, 30 Mar 2017 at 10:35 Sachin Mittal <sj...@gmail.com>
> >>> wrote:
> >>> > > >
> >>> > > > > I think I am following the third option.
> >>> > > > >
> >>> > > > > My pipeline is:
> >>> > > > >
> >>> > > > > serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer
> ());
> >>> > > > >
> >>> > > > > builder.stream(Serdes.String(), serde, "advice-stream")
> >>> > > > >   .filter(new Predicate<String, V>() { ...})
> >>> > > > >   .groupByKey()
> >>> > > > >   .aggregate(new Initializer<V1>() {...}, new
> Aggregator<String,
> >>> V,
> >>> > > V1>()
> >>> > > > > {...}, windows, supplier)
> >>> > > > >   .mapValues(new ValueMapper<V1, V2>() { ... })
> >>> > > > >   .foreach(new ForeachAction<Windowed<String>, V2>() {... });
> >>> > > > >
> >>> > > > >
> >>> > > > > and In VDeserializer (implements Deserializer<V>) I am doing
> >>> > something
> >>> > > > like
> >>> > > > > this:
> >>> > > > >
> >>> > > > >     public V deserialize(String paramString, byte[]
> >>> > paramArrayOfByte) {
> >>> > > > >         if (paramArrayOfByte == null) { return null;}
> >>> > > > >         V data = null;
> >>> > > > >         try {
> >>> > > > >             data = objectMapper.readValue(paramArrayOfByte,
> new
> >>> > > > > TypeReference<V>() {});
> >>> > > > >         } catch (Exception e) {
> >>> > > > >             e.printStackTrace();
> >>> > > > >         }
> >>> > > > >         return data;
> >>> > > > >     }
> >>> > > > >
> >>> > > > > So I am catching any exception that may happen when
> >>> deserializing the
> >>> > > > data.
> >>> > > > >
> >>> > > > > This is what third option suggest (if I am not mistaken).
> >>> > > > >
> >>> > > > > Please let me know given the pipeline we which option would be
> >>> best
> >>> > and
> >>> > > > how
> >>> > > > > can we incorporate that in our pipeline.
> >>> > > > >
> >>> > > > > Also not exception is happening when reading from source topic
> >>> which
> >>> > is
> >>> > > > > "advice-stream", so looks like flow is not going to pipeline at
> >>> all
> >>> > for
> >>> > > > us
> >>> > > > > to handle. It is terminating right at consumer poll.
> >>> > > > >
> >>> > > > > Thanks
> >>> > > > > Sachin
> >>> > > > >
> >>> > > > >
> >>> > > > > On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll <
> >>> michael@confluent.io>
> >>> > > > > wrote:
> >>> > > > >
> >>> > > > > > Could this be a corrupted message ("poison pill") in your
> >>> topic?
> >>> > > > > >
> >>> > > > > > If so, take a look at
> >>> > > > > > http://docs.confluent.io/current/streams/faq.html#
> >>> > > > > >
> >>> > > > > handling-corrupted-records-and-deserialization-errors-
> >>> > > > poison-pill-messages
> >>> > > > > >
> >>> > > > > > FYI: We're currently investigating a more elegant way to
> >>> address
> >>> > such
> >>> > > > > > poison pill problems.  If you have feedback on that front,
> feel
> >>> > free
> >>> > > to
> >>> > > > > > share it with us. :-)
> >>> > > > > >
> >>> > > > > > -Michael
> >>> > > > > >
> >>> > > > > >
> >>> > > > > >
> >>> > > > > >
> >>> > > > > > On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal <
> >>> > sjmittal@gmail.com>
> >>> > > > > > wrote:
> >>> > > > > >
> >>> > > > > > > Hi,
> >>> > > > > > > This is for first time we are getting a weird exception.
> >>> > > > > > > After this the streams caches.
> >>> > > > > > >
> >>> > > > > > > Only work around is to manually seek and commit offset to a
> >>> > greater
> >>> > > > > > number
> >>> > > > > > > and we are needing this manual intervention again and
> again.
> >>> > > > > > >
> >>> > > > > > > Any idea what is causing it and how can we circumvent this.
> >>> > > > > > >
> >>> > > > > > > Note this error happens in both cases when 10.2 client or
> >>> 10.1.1
> >>> > > > client
> >>> > > > > > > connect to kafka server 10.1.1
> >>> > > > > > >
> >>> > > > > > > So this does not looks like version issue.
> >>> > > > > > >
> >>> > > > > > > Also we have following setting
> >>> > > > > > > message.max.bytes=5000013
> >>> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576"
> >>> > > > > > > ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576"
> >>> > > > > > >
> >>> > > > > > > Rest is all default and also increasing the value for
> >>> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not
> >>> help.
> >>> > > > > > >
> >>> > > > > > > Stack trace below.
> >>> > > > > > >
> >>> > > > > > > Thanks
> >>> > > > > > > Sachin
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > org.apache.kafka.common.errors.SerializationException:
> Error
> >>> > > > > > deserializing
> >>> > > > > > > key/value for partition advice-stream-6 at offset 45153795
> >>> > > > > > > java.lang.IllegalArgumentException: null
> >>> > > > > > >   at java.nio.Buffer.limit(Buffer.java:275)
> >>> ~[na:1.8.0_122-ea]
> >>> > > > > > >   at org.apache.kafka.common.utils.
> >>> Utils.sizeDelimited(Utils.
> >>> > > > java:791)
> >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> >>> > > > > > >   at org.apache.kafka.common.record.Record.value(Record.
> >>> > java:268)
> >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> >>> > > > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> >>> > > > > > > parseRecord(Fetcher.java:867)
> >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> >>> > > > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> >>> > > > > > > parseCompletedFetch(Fetcher.java:775)
> >>> > ~[kafka-clients-0.10.2.0.jar:
> >>> > > > na]
> >>> > > > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> >>> > > > > > > fetchedRecords(Fetcher.java:473)
> >>> ~[kafka-clients-0.10.2.0.jar:
> >>> > na]
> >>> > > > > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.
> >>> > > > > > > pollOnce(KafkaConsumer.java:1062)
> >>> ~[kafka-clients-0.10.2.0.jar:
> >>> > na]
> >>> > > > > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> >>> > > > > > > KafkaConsumer.java:995)
> >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> >>> > > > > > >   at org.apache.kafka.streams.processor.internals.
> >>> > > > StreamThread.runLoop(
> >>> > > > > > > StreamThread.java:592)
> >>> > > > > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>> > > > > > >   at org.apache.kafka.streams.processor.internals.
> >>> > > > > > > StreamThread.run(StreamThread.java:378)
> >>> > ~[kafka-streams-0.10.2.1-
> >>> > > > > > > SNAPSHOT.jar:na]
> >>> > > > > > >
> >>> > > > > >
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> >>
> >>
> >>
> >
> >
>

Re: weird SerializationException when consumer is fetching and parsing record in streams application

Posted by Sachin Mittal <sj...@gmail.com>.
I am not sure if https://issues.apache.org/jira/browse/KAFKA-4740 is same
issue as mine.
What I suspect may be happening is that:
  at java.nio.Buffer.limit(Buffer.java:275) ~[na:1.8.0_122-ea]
  at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
~[kafka-clients-0.10.2.0.jar:na]
  at org.apache.kafka.common.record.Record.value(Record.java:268)
~[kafka-clients-0.10.2.0.jar:na

The size of the value does not match the actual buffer size.

Looks like the producer stored the message value size as

4 byte payload length, containing length V
V byte payload

The length does not matches the actual payload size.

Also we saw this issue for the first time, and we are running streams
application for over six months now. It happens for few messages now.

We may need to catch such exception when getting byte buffer for
keys/values and perhaps return null or something and let the high level
client handle such cases.

Thanks
Sachin



On Thu, Mar 30, 2017 at 6:59 PM, Michael Noll <mi...@confluent.io> wrote:

> Sachin,
>
> there's a JIRA that seems related to what you're seeing:
> https://issues.apache.org/jira/browse/KAFKA-4740
>
> Perhaps you could check the above and report back?
>
> -Michael
>
>
>
>
> On Thu, Mar 30, 2017 at 3:23 PM, Michael Noll <mi...@confluent.io>
> wrote:
>
> > Hmm, I re-read the stacktrace again. It does look like the value-side
> > being the culprit (as Sachin suggested earlier).
> >
> > -Michael
> >
> >
> > On Thu, Mar 30, 2017 at 3:18 PM, Michael Noll <mi...@confluent.io>
> > wrote:
> >
> >> Sachin,
> >>
> >> you have this line:
> >>
> >> > builder.stream(Serdes.String(), serde, "advice-stream")
> >>
> >> Could the problem be that not the record values are causing the problem
> >> -- because your value deserializer does try-catch any such errors -- but
> >> that the record *keys* are malformed?  The built-in `Serdes.String()`
> does
> >> not try-catch deserialization errors, and from a quick look at the
> source
> >> it seems that the `Fetcher` class (clients/src/main/java/org/apa
> >> che/kafka/clients/consumer/internals/Fetcher.java) is throwing your
> >> error above ("Error deserializing key/value for partition..."), and the
> >> Fetcher is swallowing the more specific SerializationException of
> >> `String.Serdes()` (but it will include the original exception/Throwable
> in
> >> its own SerializationException).
> >>
> >> -Michael
> >>
> >>
> >>
> >> On Thu, Mar 30, 2017 at 2:52 PM, Sachin Mittal <sj...@gmail.com>
> >> wrote:
> >>
> >>> My streams application does run in debug mode only.
> >>> Also I have checked the code around these lines
> >>>
> >>>   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
> >>> ~[kafka-clients-0.10.2.0.jar:na]
> >>>   at org.apache.kafka.common.record.Record.value(Record.java:268)
> >>> ~[kafka-clients-0.10.2.0.jar:na]
> >>>   at org.apache.kafka.clients.consumer.internals.Fetcher.parseRec
> >>> ord(Fetcher.java:867)
> >>> ~[kafka-clients-0.10.2.0.jar:na]
> >>>
> >>> I don't see any log statement which will give me more information.
> >>>
> >>> https://github.com/apache/kafka/blob/0.10.2/clients/src/main
> >>> /java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867
> >>>
> >>> The issue is happening at this line and perhaps handling the exception
> >>> and
> >>> setting the value to be null may be better options.
> >>> Yes at client side nothing can be done because exception is happening
> >>> before this.valueDeserializer.deserialize can be called.
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>>
> >>> On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy <da...@gmail.com>
> >>> wrote:
> >>>
> >>> > The suggestions in that FAQ won't help as it is too late, i.e., the
> >>> message
> >>> > has already been received into Streams.
> >>> > You could create a simple app that uses the Consumer, seeks to the
> >>> offset,
> >>> > and tries to read the message. If you did this in debug mode you
> might
> >>> find
> >>> > out some more information.
> >>> >
> >>> >
> >>> > On Thu, 30 Mar 2017 at 11:50 Sachin Mittal <sj...@gmail.com>
> wrote:
> >>> >
> >>> > > Well I try to read that offset via kafka-console-consumer.sh too
> and
> >>> it
> >>> > > fails with same error.
> >>> > >
> >>> > > So was wondering if I can apply any of the suggestion as per
> >>> > >
> >>> > > http://docs.confluent.io/3.2.0/streams/faq.html#handling-
> >>> > corrupted-records-and-deserialization-errors-poison-pill-messages
> >>> > >
> >>> > > If there is any other was just to get the contents of that message
> it
> >>> > would
> >>> > > be helpful.
> >>> > >
> >>> > > Thanks
> >>> > > Sachin
> >>> > >
> >>> > >
> >>> > > On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy <da...@gmail.com>
> >>> > wrote:
> >>> > >
> >>> > > > Hi Sachin,
> >>> > > >
> >>> > > > Have you tried firing up a consumer (non-streams), seeking to
> that
> >>> > offset
> >>> > > > on the topic and seeing what the message is? Might be easier to
> >>> debug?
> >>> > > Like
> >>> > > > you say, it is failing in the consumer.
> >>> > > > Thanks,
> >>> > > > Damian
> >>> > > >
> >>> > > > On Thu, 30 Mar 2017 at 10:35 Sachin Mittal <sj...@gmail.com>
> >>> wrote:
> >>> > > >
> >>> > > > > I think I am following the third option.
> >>> > > > >
> >>> > > > > My pipeline is:
> >>> > > > >
> >>> > > > > serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer
> ());
> >>> > > > >
> >>> > > > > builder.stream(Serdes.String(), serde, "advice-stream")
> >>> > > > >   .filter(new Predicate<String, V>() { ...})
> >>> > > > >   .groupByKey()
> >>> > > > >   .aggregate(new Initializer<V1>() {...}, new
> Aggregator<String,
> >>> V,
> >>> > > V1>()
> >>> > > > > {...}, windows, supplier)
> >>> > > > >   .mapValues(new ValueMapper<V1, V2>() { ... })
> >>> > > > >   .foreach(new ForeachAction<Windowed<String>, V2>() {... });
> >>> > > > >
> >>> > > > >
> >>> > > > > and In VDeserializer (implements Deserializer<V>) I am doing
> >>> > something
> >>> > > > like
> >>> > > > > this:
> >>> > > > >
> >>> > > > >     public V deserialize(String paramString, byte[]
> >>> > paramArrayOfByte) {
> >>> > > > >         if (paramArrayOfByte == null) { return null;}
> >>> > > > >         V data = null;
> >>> > > > >         try {
> >>> > > > >             data = objectMapper.readValue(paramArrayOfByte,
> new
> >>> > > > > TypeReference<V>() {});
> >>> > > > >         } catch (Exception e) {
> >>> > > > >             e.printStackTrace();
> >>> > > > >         }
> >>> > > > >         return data;
> >>> > > > >     }
> >>> > > > >
> >>> > > > > So I am catching any exception that may happen when
> >>> deserializing the
> >>> > > > data.
> >>> > > > >
> >>> > > > > This is what third option suggest (if I am not mistaken).
> >>> > > > >
> >>> > > > > Please let me know given the pipeline we which option would be
> >>> best
> >>> > and
> >>> > > > how
> >>> > > > > can we incorporate that in our pipeline.
> >>> > > > >
> >>> > > > > Also not exception is happening when reading from source topic
> >>> which
> >>> > is
> >>> > > > > "advice-stream", so looks like flow is not going to pipeline at
> >>> all
> >>> > for
> >>> > > > us
> >>> > > > > to handle. It is terminating right at consumer poll.
> >>> > > > >
> >>> > > > > Thanks
> >>> > > > > Sachin
> >>> > > > >
> >>> > > > >
> >>> > > > > On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll <
> >>> michael@confluent.io>
> >>> > > > > wrote:
> >>> > > > >
> >>> > > > > > Could this be a corrupted message ("poison pill") in your
> >>> topic?
> >>> > > > > >
> >>> > > > > > If so, take a look at
> >>> > > > > > http://docs.confluent.io/current/streams/faq.html#
> >>> > > > > >
> >>> > > > > handling-corrupted-records-and-deserialization-errors-
> >>> > > > poison-pill-messages
> >>> > > > > >
> >>> > > > > > FYI: We're currently investigating a more elegant way to
> >>> address
> >>> > such
> >>> > > > > > poison pill problems.  If you have feedback on that front,
> feel
> >>> > free
> >>> > > to
> >>> > > > > > share it with us. :-)
> >>> > > > > >
> >>> > > > > > -Michael
> >>> > > > > >
> >>> > > > > >
> >>> > > > > >
> >>> > > > > >
> >>> > > > > > On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal <
> >>> > sjmittal@gmail.com>
> >>> > > > > > wrote:
> >>> > > > > >
> >>> > > > > > > Hi,
> >>> > > > > > > This is for first time we are getting a weird exception.
> >>> > > > > > > After this the streams caches.
> >>> > > > > > >
> >>> > > > > > > Only work around is to manually seek and commit offset to a
> >>> > greater
> >>> > > > > > number
> >>> > > > > > > and we are needing this manual intervention again and
> again.
> >>> > > > > > >
> >>> > > > > > > Any idea what is causing it and how can we circumvent this.
> >>> > > > > > >
> >>> > > > > > > Note this error happens in both cases when 10.2 client or
> >>> 10.1.1
> >>> > > > client
> >>> > > > > > > connect to kafka server 10.1.1
> >>> > > > > > >
> >>> > > > > > > So this does not looks like version issue.
> >>> > > > > > >
> >>> > > > > > > Also we have following setting
> >>> > > > > > > message.max.bytes=5000013
> >>> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576"
> >>> > > > > > > ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576"
> >>> > > > > > >
> >>> > > > > > > Rest is all default and also increasing the value for
> >>> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not
> >>> help.
> >>> > > > > > >
> >>> > > > > > > Stack trace below.
> >>> > > > > > >
> >>> > > > > > > Thanks
> >>> > > > > > > Sachin
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > org.apache.kafka.common.errors.SerializationException:
> Error
> >>> > > > > > deserializing
> >>> > > > > > > key/value for partition advice-stream-6 at offset 45153795
> >>> > > > > > > java.lang.IllegalArgumentException: null
> >>> > > > > > >   at java.nio.Buffer.limit(Buffer.java:275)
> >>> ~[na:1.8.0_122-ea]
> >>> > > > > > >   at org.apache.kafka.common.utils.
> >>> Utils.sizeDelimited(Utils.
> >>> > > > java:791)
> >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> >>> > > > > > >   at org.apache.kafka.common.record.Record.value(Record.
> >>> > java:268)
> >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> >>> > > > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> >>> > > > > > > parseRecord(Fetcher.java:867)
> >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> >>> > > > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> >>> > > > > > > parseCompletedFetch(Fetcher.java:775)
> >>> > ~[kafka-clients-0.10.2.0.jar:
> >>> > > > na]
> >>> > > > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> >>> > > > > > > fetchedRecords(Fetcher.java:473)
> >>> ~[kafka-clients-0.10.2.0.jar:
> >>> > na]
> >>> > > > > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.
> >>> > > > > > > pollOnce(KafkaConsumer.java:1062)
> >>> ~[kafka-clients-0.10.2.0.jar:
> >>> > na]
> >>> > > > > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> >>> > > > > > > KafkaConsumer.java:995)
> >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> >>> > > > > > >   at org.apache.kafka.streams.processor.internals.
> >>> > > > StreamThread.runLoop(
> >>> > > > > > > StreamThread.java:592)
> >>> > > > > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>> > > > > > >   at org.apache.kafka.streams.processor.internals.
> >>> > > > > > > StreamThread.run(StreamThread.java:378)
> >>> > ~[kafka-streams-0.10.2.1-
> >>> > > > > > > SNAPSHOT.jar:na]
> >>> > > > > > >
> >>> > > > > >
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> >>
> >>
> >>
> >
> >
>

Re: weird SerializationException when consumer is fetching and parsing record in streams application

Posted by Michael Noll <mi...@confluent.io>.
Sachin,

there's a JIRA that seems related to what you're seeing:
https://issues.apache.org/jira/browse/KAFKA-4740

Perhaps you could check the above and report back?

-Michael




On Thu, Mar 30, 2017 at 3:23 PM, Michael Noll <mi...@confluent.io> wrote:

> Hmm, I re-read the stacktrace again. It does look like the value-side
> being the culprit (as Sachin suggested earlier).
>
> -Michael
>
>
> On Thu, Mar 30, 2017 at 3:18 PM, Michael Noll <mi...@confluent.io>
> wrote:
>
>> Sachin,
>>
>> you have this line:
>>
>> > builder.stream(Serdes.String(), serde, "advice-stream")
>>
>> Could the problem be that not the record values are causing the problem
>> -- because your value deserializer does try-catch any such errors -- but
>> that the record *keys* are malformed?  The built-in `Serdes.String()` does
>> not try-catch deserialization errors, and from a quick look at the source
>> it seems that the `Fetcher` class (clients/src/main/java/org/apa
>> che/kafka/clients/consumer/internals/Fetcher.java) is throwing your
>> error above ("Error deserializing key/value for partition..."), and the
>> Fetcher is swallowing the more specific SerializationException of
>> `String.Serdes()` (but it will include the original exception/Throwable in
>> its own SerializationException).
>>
>> -Michael
>>
>>
>>
>> On Thu, Mar 30, 2017 at 2:52 PM, Sachin Mittal <sj...@gmail.com>
>> wrote:
>>
>>> My streams application does run in debug mode only.
>>> Also I have checked the code around these lines
>>>
>>>   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
>>> ~[kafka-clients-0.10.2.0.jar:na]
>>>   at org.apache.kafka.common.record.Record.value(Record.java:268)
>>> ~[kafka-clients-0.10.2.0.jar:na]
>>>   at org.apache.kafka.clients.consumer.internals.Fetcher.parseRec
>>> ord(Fetcher.java:867)
>>> ~[kafka-clients-0.10.2.0.jar:na]
>>>
>>> I don't see any log statement which will give me more information.
>>>
>>> https://github.com/apache/kafka/blob/0.10.2/clients/src/main
>>> /java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867
>>>
>>> The issue is happening at this line and perhaps handling the exception
>>> and
>>> setting the value to be null may be better options.
>>> Yes at client side nothing can be done because exception is happening
>>> before this.valueDeserializer.deserialize can be called.
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>> On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy <da...@gmail.com>
>>> wrote:
>>>
>>> > The suggestions in that FAQ won't help as it is too late, i.e., the
>>> message
>>> > has already been received into Streams.
>>> > You could create a simple app that uses the Consumer, seeks to the
>>> offset,
>>> > and tries to read the message. If you did this in debug mode you might
>>> find
>>> > out some more information.
>>> >
>>> >
>>> > On Thu, 30 Mar 2017 at 11:50 Sachin Mittal <sj...@gmail.com> wrote:
>>> >
>>> > > Well I try to read that offset via kafka-console-consumer.sh too and
>>> it
>>> > > fails with same error.
>>> > >
>>> > > So was wondering if I can apply any of the suggestion as per
>>> > >
>>> > > http://docs.confluent.io/3.2.0/streams/faq.html#handling-
>>> > corrupted-records-and-deserialization-errors-poison-pill-messages
>>> > >
>>> > > If there is any other was just to get the contents of that message it
>>> > would
>>> > > be helpful.
>>> > >
>>> > > Thanks
>>> > > Sachin
>>> > >
>>> > >
>>> > > On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy <da...@gmail.com>
>>> > wrote:
>>> > >
>>> > > > Hi Sachin,
>>> > > >
>>> > > > Have you tried firing up a consumer (non-streams), seeking to that
>>> > offset
>>> > > > on the topic and seeing what the message is? Might be easier to
>>> debug?
>>> > > Like
>>> > > > you say, it is failing in the consumer.
>>> > > > Thanks,
>>> > > > Damian
>>> > > >
>>> > > > On Thu, 30 Mar 2017 at 10:35 Sachin Mittal <sj...@gmail.com>
>>> wrote:
>>> > > >
>>> > > > > I think I am following the third option.
>>> > > > >
>>> > > > > My pipeline is:
>>> > > > >
>>> > > > > serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer ());
>>> > > > >
>>> > > > > builder.stream(Serdes.String(), serde, "advice-stream")
>>> > > > >   .filter(new Predicate<String, V>() { ...})
>>> > > > >   .groupByKey()
>>> > > > >   .aggregate(new Initializer<V1>() {...}, new Aggregator<String,
>>> V,
>>> > > V1>()
>>> > > > > {...}, windows, supplier)
>>> > > > >   .mapValues(new ValueMapper<V1, V2>() { ... })
>>> > > > >   .foreach(new ForeachAction<Windowed<String>, V2>() {... });
>>> > > > >
>>> > > > >
>>> > > > > and In VDeserializer (implements Deserializer<V>) I am doing
>>> > something
>>> > > > like
>>> > > > > this:
>>> > > > >
>>> > > > >     public V deserialize(String paramString, byte[]
>>> > paramArrayOfByte) {
>>> > > > >         if (paramArrayOfByte == null) { return null;}
>>> > > > >         V data = null;
>>> > > > >         try {
>>> > > > >             data = objectMapper.readValue(paramArrayOfByte, new
>>> > > > > TypeReference<V>() {});
>>> > > > >         } catch (Exception e) {
>>> > > > >             e.printStackTrace();
>>> > > > >         }
>>> > > > >         return data;
>>> > > > >     }
>>> > > > >
>>> > > > > So I am catching any exception that may happen when
>>> deserializing the
>>> > > > data.
>>> > > > >
>>> > > > > This is what third option suggest (if I am not mistaken).
>>> > > > >
>>> > > > > Please let me know given the pipeline we which option would be
>>> best
>>> > and
>>> > > > how
>>> > > > > can we incorporate that in our pipeline.
>>> > > > >
>>> > > > > Also not exception is happening when reading from source topic
>>> which
>>> > is
>>> > > > > "advice-stream", so looks like flow is not going to pipeline at
>>> all
>>> > for
>>> > > > us
>>> > > > > to handle. It is terminating right at consumer poll.
>>> > > > >
>>> > > > > Thanks
>>> > > > > Sachin
>>> > > > >
>>> > > > >
>>> > > > > On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll <
>>> michael@confluent.io>
>>> > > > > wrote:
>>> > > > >
>>> > > > > > Could this be a corrupted message ("poison pill") in your
>>> topic?
>>> > > > > >
>>> > > > > > If so, take a look at
>>> > > > > > http://docs.confluent.io/current/streams/faq.html#
>>> > > > > >
>>> > > > > handling-corrupted-records-and-deserialization-errors-
>>> > > > poison-pill-messages
>>> > > > > >
>>> > > > > > FYI: We're currently investigating a more elegant way to
>>> address
>>> > such
>>> > > > > > poison pill problems.  If you have feedback on that front, feel
>>> > free
>>> > > to
>>> > > > > > share it with us. :-)
>>> > > > > >
>>> > > > > > -Michael
>>> > > > > >
>>> > > > > >
>>> > > > > >
>>> > > > > >
>>> > > > > > On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal <
>>> > sjmittal@gmail.com>
>>> > > > > > wrote:
>>> > > > > >
>>> > > > > > > Hi,
>>> > > > > > > This is for first time we are getting a weird exception.
>>> > > > > > > After this the streams caches.
>>> > > > > > >
>>> > > > > > > Only work around is to manually seek and commit offset to a
>>> > greater
>>> > > > > > number
>>> > > > > > > and we are needing this manual intervention again and again.
>>> > > > > > >
>>> > > > > > > Any idea what is causing it and how can we circumvent this.
>>> > > > > > >
>>> > > > > > > Note this error happens in both cases when 10.2 client or
>>> 10.1.1
>>> > > > client
>>> > > > > > > connect to kafka server 10.1.1
>>> > > > > > >
>>> > > > > > > So this does not looks like version issue.
>>> > > > > > >
>>> > > > > > > Also we have following setting
>>> > > > > > > message.max.bytes=5000013
>>> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576"
>>> > > > > > > ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576"
>>> > > > > > >
>>> > > > > > > Rest is all default and also increasing the value for
>>> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not
>>> help.
>>> > > > > > >
>>> > > > > > > Stack trace below.
>>> > > > > > >
>>> > > > > > > Thanks
>>> > > > > > > Sachin
>>> > > > > > >
>>> > > > > > >
>>> > > > > > > org.apache.kafka.common.errors.SerializationException: Error
>>> > > > > > deserializing
>>> > > > > > > key/value for partition advice-stream-6 at offset 45153795
>>> > > > > > > java.lang.IllegalArgumentException: null
>>> > > > > > >   at java.nio.Buffer.limit(Buffer.java:275)
>>> ~[na:1.8.0_122-ea]
>>> > > > > > >   at org.apache.kafka.common.utils.
>>> Utils.sizeDelimited(Utils.
>>> > > > java:791)
>>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
>>> > > > > > >   at org.apache.kafka.common.record.Record.value(Record.
>>> > java:268)
>>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
>>> > > > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
>>> > > > > > > parseRecord(Fetcher.java:867)
>>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
>>> > > > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
>>> > > > > > > parseCompletedFetch(Fetcher.java:775)
>>> > ~[kafka-clients-0.10.2.0.jar:
>>> > > > na]
>>> > > > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
>>> > > > > > > fetchedRecords(Fetcher.java:473)
>>> ~[kafka-clients-0.10.2.0.jar:
>>> > na]
>>> > > > > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.
>>> > > > > > > pollOnce(KafkaConsumer.java:1062)
>>> ~[kafka-clients-0.10.2.0.jar:
>>> > na]
>>> > > > > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
>>> > > > > > > KafkaConsumer.java:995)
>>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
>>> > > > > > >   at org.apache.kafka.streams.processor.internals.
>>> > > > StreamThread.runLoop(
>>> > > > > > > StreamThread.java:592)
>>> > > > > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>> > > > > > >   at org.apache.kafka.streams.processor.internals.
>>> > > > > > > StreamThread.run(StreamThread.java:378)
>>> > ~[kafka-streams-0.10.2.1-
>>> > > > > > > SNAPSHOT.jar:na]
>>> > > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>
>>
>>
>
>

Re: weird SerializationException when consumer is fetching and parsing record in streams application

Posted by Michael Noll <mi...@confluent.io>.
Hmm, I re-read the stacktrace again. It does look like the value-side being
the culprit (as Sachin suggested earlier).

-Michael


On Thu, Mar 30, 2017 at 3:18 PM, Michael Noll <mi...@confluent.io> wrote:

> Sachin,
>
> you have this line:
>
> > builder.stream(Serdes.String(), serde, "advice-stream")
>
> Could the problem be that not the record values are causing the problem --
> because your value deserializer does try-catch any such errors -- but that
> the record *keys* are malformed?  The built-in `Serdes.String()` does not
> try-catch deserialization errors, and from a quick look at the source it
> seems that the `Fetcher` class (clients/src/main/java/org/
> apache/kafka/clients/consumer/internals/Fetcher.java) is throwing your
> error above ("Error deserializing key/value for partition..."), and the
> Fetcher is swallowing the more specific SerializationException of
> `String.Serdes()` (but it will include the original exception/Throwable in
> its own SerializationException).
>
> -Michael
>
>
>
> On Thu, Mar 30, 2017 at 2:52 PM, Sachin Mittal <sj...@gmail.com> wrote:
>
>> My streams application does run in debug mode only.
>> Also I have checked the code around these lines
>>
>>   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
>> ~[kafka-clients-0.10.2.0.jar:na]
>>   at org.apache.kafka.common.record.Record.value(Record.java:268)
>> ~[kafka-clients-0.10.2.0.jar:na]
>>   at org.apache.kafka.clients.consumer.internals.Fetcher.parseRec
>> ord(Fetcher.java:867)
>> ~[kafka-clients-0.10.2.0.jar:na]
>>
>> I don't see any log statement which will give me more information.
>>
>> https://github.com/apache/kafka/blob/0.10.2/clients/src/main
>> /java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867
>>
>> The issue is happening at this line and perhaps handling the exception and
>> setting the value to be null may be better options.
>> Yes at client side nothing can be done because exception is happening
>> before this.valueDeserializer.deserialize can be called.
>>
>> Thanks
>> Sachin
>>
>>
>> On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy <da...@gmail.com> wrote:
>>
>> > The suggestions in that FAQ won't help as it is too late, i.e., the
>> message
>> > has already been received into Streams.
>> > You could create a simple app that uses the Consumer, seeks to the
>> offset,
>> > and tries to read the message. If you did this in debug mode you might
>> find
>> > out some more information.
>> >
>> >
>> > On Thu, 30 Mar 2017 at 11:50 Sachin Mittal <sj...@gmail.com> wrote:
>> >
>> > > Well I try to read that offset via kafka-console-consumer.sh too and
>> it
>> > > fails with same error.
>> > >
>> > > So was wondering if I can apply any of the suggestion as per
>> > >
>> > > http://docs.confluent.io/3.2.0/streams/faq.html#handling-
>> > corrupted-records-and-deserialization-errors-poison-pill-messages
>> > >
>> > > If there is any other was just to get the contents of that message it
>> > would
>> > > be helpful.
>> > >
>> > > Thanks
>> > > Sachin
>> > >
>> > >
>> > > On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy <da...@gmail.com>
>> > wrote:
>> > >
>> > > > Hi Sachin,
>> > > >
>> > > > Have you tried firing up a consumer (non-streams), seeking to that
>> > offset
>> > > > on the topic and seeing what the message is? Might be easier to
>> debug?
>> > > Like
>> > > > you say, it is failing in the consumer.
>> > > > Thanks,
>> > > > Damian
>> > > >
>> > > > On Thu, 30 Mar 2017 at 10:35 Sachin Mittal <sj...@gmail.com>
>> wrote:
>> > > >
>> > > > > I think I am following the third option.
>> > > > >
>> > > > > My pipeline is:
>> > > > >
>> > > > > serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer ());
>> > > > >
>> > > > > builder.stream(Serdes.String(), serde, "advice-stream")
>> > > > >   .filter(new Predicate<String, V>() { ...})
>> > > > >   .groupByKey()
>> > > > >   .aggregate(new Initializer<V1>() {...}, new Aggregator<String,
>> V,
>> > > V1>()
>> > > > > {...}, windows, supplier)
>> > > > >   .mapValues(new ValueMapper<V1, V2>() { ... })
>> > > > >   .foreach(new ForeachAction<Windowed<String>, V2>() {... });
>> > > > >
>> > > > >
>> > > > > and In VDeserializer (implements Deserializer<V>) I am doing
>> > something
>> > > > like
>> > > > > this:
>> > > > >
>> > > > >     public V deserialize(String paramString, byte[]
>> > paramArrayOfByte) {
>> > > > >         if (paramArrayOfByte == null) { return null;}
>> > > > >         V data = null;
>> > > > >         try {
>> > > > >             data = objectMapper.readValue(paramArrayOfByte, new
>> > > > > TypeReference<V>() {});
>> > > > >         } catch (Exception e) {
>> > > > >             e.printStackTrace();
>> > > > >         }
>> > > > >         return data;
>> > > > >     }
>> > > > >
>> > > > > So I am catching any exception that may happen when deserializing
>> the
>> > > > data.
>> > > > >
>> > > > > This is what third option suggest (if I am not mistaken).
>> > > > >
>> > > > > Please let me know given the pipeline we which option would be
>> best
>> > and
>> > > > how
>> > > > > can we incorporate that in our pipeline.
>> > > > >
>> > > > > Also not exception is happening when reading from source topic
>> which
>> > is
>> > > > > "advice-stream", so looks like flow is not going to pipeline at
>> all
>> > for
>> > > > us
>> > > > > to handle. It is terminating right at consumer poll.
>> > > > >
>> > > > > Thanks
>> > > > > Sachin
>> > > > >
>> > > > >
>> > > > > On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll <
>> michael@confluent.io>
>> > > > > wrote:
>> > > > >
>> > > > > > Could this be a corrupted message ("poison pill") in your topic?
>> > > > > >
>> > > > > > If so, take a look at
>> > > > > > http://docs.confluent.io/current/streams/faq.html#
>> > > > > >
>> > > > > handling-corrupted-records-and-deserialization-errors-
>> > > > poison-pill-messages
>> > > > > >
>> > > > > > FYI: We're currently investigating a more elegant way to address
>> > such
>> > > > > > poison pill problems.  If you have feedback on that front, feel
>> > free
>> > > to
>> > > > > > share it with us. :-)
>> > > > > >
>> > > > > > -Michael
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal <
>> > sjmittal@gmail.com>
>> > > > > > wrote:
>> > > > > >
>> > > > > > > Hi,
>> > > > > > > This is for first time we are getting a weird exception.
>> > > > > > > After this the streams caches.
>> > > > > > >
>> > > > > > > Only work around is to manually seek and commit offset to a
>> > greater
>> > > > > > number
>> > > > > > > and we are needing this manual intervention again and again.
>> > > > > > >
>> > > > > > > Any idea what is causing it and how can we circumvent this.
>> > > > > > >
>> > > > > > > Note this error happens in both cases when 10.2 client or
>> 10.1.1
>> > > > client
>> > > > > > > connect to kafka server 10.1.1
>> > > > > > >
>> > > > > > > So this does not looks like version issue.
>> > > > > > >
>> > > > > > > Also we have following setting
>> > > > > > > message.max.bytes=5000013
>> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576"
>> > > > > > > ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576"
>> > > > > > >
>> > > > > > > Rest is all default and also increasing the value for
>> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not help.
>> > > > > > >
>> > > > > > > Stack trace below.
>> > > > > > >
>> > > > > > > Thanks
>> > > > > > > Sachin
>> > > > > > >
>> > > > > > >
>> > > > > > > org.apache.kafka.common.errors.SerializationException: Error
>> > > > > > deserializing
>> > > > > > > key/value for partition advice-stream-6 at offset 45153795
>> > > > > > > java.lang.IllegalArgumentException: null
>> > > > > > >   at java.nio.Buffer.limit(Buffer.java:275)
>> ~[na:1.8.0_122-ea]
>> > > > > > >   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.
>> > > > java:791)
>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
>> > > > > > >   at org.apache.kafka.common.record.Record.value(Record.
>> > java:268)
>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
>> > > > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
>> > > > > > > parseRecord(Fetcher.java:867)
>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
>> > > > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
>> > > > > > > parseCompletedFetch(Fetcher.java:775)
>> > ~[kafka-clients-0.10.2.0.jar:
>> > > > na]
>> > > > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
>> > > > > > > fetchedRecords(Fetcher.java:473)
>> ~[kafka-clients-0.10.2.0.jar:
>> > na]
>> > > > > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.
>> > > > > > > pollOnce(KafkaConsumer.java:1062)
>> ~[kafka-clients-0.10.2.0.jar:
>> > na]
>> > > > > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
>> > > > > > > KafkaConsumer.java:995)
>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
>> > > > > > >   at org.apache.kafka.streams.processor.internals.
>> > > > StreamThread.runLoop(
>> > > > > > > StreamThread.java:592)
>> > > > > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>> > > > > > >   at org.apache.kafka.streams.processor.internals.
>> > > > > > > StreamThread.run(StreamThread.java:378)
>> > ~[kafka-streams-0.10.2.1-
>> > > > > > > SNAPSHOT.jar:na]
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>
>

Re: weird SerializationException when consumer is fetching and parsing record in streams application

Posted by Michael Noll <mi...@confluent.io>.
Sachin,

you have this line:

> builder.stream(Serdes.String(), serde, "advice-stream")

Could the problem be that not the record values are causing the problem --
because your value deserializer does try-catch any such errors -- but that
the record *keys* are malformed?  The built-in `Serdes.String()` does not
try-catch deserialization errors, and from a quick look at the source it
seems that the `Fetcher` class
(clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java)
is throwing your error above ("Error deserializing key/value for
partition..."), and the Fetcher is swallowing the more specific
SerializationException of `String.Serdes()` (but it will include the
original exception/Throwable in its own SerializationException).

-Michael



On Thu, Mar 30, 2017 at 2:52 PM, Sachin Mittal <sj...@gmail.com> wrote:

> My streams application does run in debug mode only.
> Also I have checked the code around these lines
>
>   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
> ~[kafka-clients-0.10.2.0.jar:na]
>   at org.apache.kafka.common.record.Record.value(Record.java:268)
> ~[kafka-clients-0.10.2.0.jar:na]
>   at org.apache.kafka.clients.consumer.internals.Fetcher.
> parseRecord(Fetcher.java:867)
> ~[kafka-clients-0.10.2.0.jar:na]
>
> I don't see any log statement which will give me more information.
>
> https://github.com/apache/kafka/blob/0.10.2/clients/src/
> main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867
>
> The issue is happening at this line and perhaps handling the exception and
> setting the value to be null may be better options.
> Yes at client side nothing can be done because exception is happening
> before this.valueDeserializer.deserialize can be called.
>
> Thanks
> Sachin
>
>
> On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy <da...@gmail.com> wrote:
>
> > The suggestions in that FAQ won't help as it is too late, i.e., the
> message
> > has already been received into Streams.
> > You could create a simple app that uses the Consumer, seeks to the
> offset,
> > and tries to read the message. If you did this in debug mode you might
> find
> > out some more information.
> >
> >
> > On Thu, 30 Mar 2017 at 11:50 Sachin Mittal <sj...@gmail.com> wrote:
> >
> > > Well I try to read that offset via kafka-console-consumer.sh too and it
> > > fails with same error.
> > >
> > > So was wondering if I can apply any of the suggestion as per
> > >
> > > http://docs.confluent.io/3.2.0/streams/faq.html#handling-
> > corrupted-records-and-deserialization-errors-poison-pill-messages
> > >
> > > If there is any other was just to get the contents of that message it
> > would
> > > be helpful.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > > On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy <da...@gmail.com>
> > wrote:
> > >
> > > > Hi Sachin,
> > > >
> > > > Have you tried firing up a consumer (non-streams), seeking to that
> > offset
> > > > on the topic and seeing what the message is? Might be easier to
> debug?
> > > Like
> > > > you say, it is failing in the consumer.
> > > > Thanks,
> > > > Damian
> > > >
> > > > On Thu, 30 Mar 2017 at 10:35 Sachin Mittal <sj...@gmail.com>
> wrote:
> > > >
> > > > > I think I am following the third option.
> > > > >
> > > > > My pipeline is:
> > > > >
> > > > > serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer ());
> > > > >
> > > > > builder.stream(Serdes.String(), serde, "advice-stream")
> > > > >   .filter(new Predicate<String, V>() { ...})
> > > > >   .groupByKey()
> > > > >   .aggregate(new Initializer<V1>() {...}, new Aggregator<String, V,
> > > V1>()
> > > > > {...}, windows, supplier)
> > > > >   .mapValues(new ValueMapper<V1, V2>() { ... })
> > > > >   .foreach(new ForeachAction<Windowed<String>, V2>() {... });
> > > > >
> > > > >
> > > > > and In VDeserializer (implements Deserializer<V>) I am doing
> > something
> > > > like
> > > > > this:
> > > > >
> > > > >     public V deserialize(String paramString, byte[]
> > paramArrayOfByte) {
> > > > >         if (paramArrayOfByte == null) { return null;}
> > > > >         V data = null;
> > > > >         try {
> > > > >             data = objectMapper.readValue(paramArrayOfByte, new
> > > > > TypeReference<V>() {});
> > > > >         } catch (Exception e) {
> > > > >             e.printStackTrace();
> > > > >         }
> > > > >         return data;
> > > > >     }
> > > > >
> > > > > So I am catching any exception that may happen when deserializing
> the
> > > > data.
> > > > >
> > > > > This is what third option suggest (if I am not mistaken).
> > > > >
> > > > > Please let me know given the pipeline we which option would be best
> > and
> > > > how
> > > > > can we incorporate that in our pipeline.
> > > > >
> > > > > Also not exception is happening when reading from source topic
> which
> > is
> > > > > "advice-stream", so looks like flow is not going to pipeline at all
> > for
> > > > us
> > > > > to handle. It is terminating right at consumer poll.
> > > > >
> > > > > Thanks
> > > > > Sachin
> > > > >
> > > > >
> > > > > On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll <
> michael@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Could this be a corrupted message ("poison pill") in your topic?
> > > > > >
> > > > > > If so, take a look at
> > > > > > http://docs.confluent.io/current/streams/faq.html#
> > > > > >
> > > > > handling-corrupted-records-and-deserialization-errors-
> > > > poison-pill-messages
> > > > > >
> > > > > > FYI: We're currently investigating a more elegant way to address
> > such
> > > > > > poison pill problems.  If you have feedback on that front, feel
> > free
> > > to
> > > > > > share it with us. :-)
> > > > > >
> > > > > > -Michael
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal <
> > sjmittal@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > > This is for first time we are getting a weird exception.
> > > > > > > After this the streams caches.
> > > > > > >
> > > > > > > Only work around is to manually seek and commit offset to a
> > greater
> > > > > > number
> > > > > > > and we are needing this manual intervention again and again.
> > > > > > >
> > > > > > > Any idea what is causing it and how can we circumvent this.
> > > > > > >
> > > > > > > Note this error happens in both cases when 10.2 client or
> 10.1.1
> > > > client
> > > > > > > connect to kafka server 10.1.1
> > > > > > >
> > > > > > > So this does not looks like version issue.
> > > > > > >
> > > > > > > Also we have following setting
> > > > > > > message.max.bytes=5000013
> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576"
> > > > > > > ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576"
> > > > > > >
> > > > > > > Rest is all default and also increasing the value for
> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not help.
> > > > > > >
> > > > > > > Stack trace below.
> > > > > > >
> > > > > > > Thanks
> > > > > > > Sachin
> > > > > > >
> > > > > > >
> > > > > > > org.apache.kafka.common.errors.SerializationException: Error
> > > > > > deserializing
> > > > > > > key/value for partition advice-stream-6 at offset 45153795
> > > > > > > java.lang.IllegalArgumentException: null
> > > > > > >   at java.nio.Buffer.limit(Buffer.java:275) ~[na:1.8.0_122-ea]
> > > > > > >   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.
> > > > java:791)
> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > > > > >   at org.apache.kafka.common.record.Record.value(Record.
> > java:268)
> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > > > > > > parseRecord(Fetcher.java:867)
> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > > > > > > parseCompletedFetch(Fetcher.java:775)
> > ~[kafka-clients-0.10.2.0.jar:
> > > > na]
> > > > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > > > > > > fetchedRecords(Fetcher.java:473) ~[kafka-clients-0.10.2.0.jar:
> > na]
> > > > > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.
> > > > > > > pollOnce(KafkaConsumer.java:1062)
> ~[kafka-clients-0.10.2.0.jar:
> > na]
> > > > > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > > > > > KafkaConsumer.java:995)
> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > > > > >   at org.apache.kafka.streams.processor.internals.
> > > > StreamThread.runLoop(
> > > > > > > StreamThread.java:592)
> > > > > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > > > > > >   at org.apache.kafka.streams.processor.internals.
> > > > > > > StreamThread.run(StreamThread.java:378)
> > ~[kafka-streams-0.10.2.1-
> > > > > > > SNAPSHOT.jar:na]
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: weird SerializationException when consumer is fetching and parsing record in streams application

Posted by Sachin Mittal <sj...@gmail.com>.
My streams application does run in debug mode only.
Also I have checked the code around these lines

  at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
~[kafka-clients-0.10.2.0.jar:na]
  at org.apache.kafka.common.record.Record.value(Record.java:268)
~[kafka-clients-0.10.2.0.jar:na]
  at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:867)
~[kafka-clients-0.10.2.0.jar:na]

I don't see any log statement which will give me more information.

https://github.com/apache/kafka/blob/0.10.2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867

The issue is happening at this line and perhaps handling the exception and
setting the value to be null may be better options.
Yes at client side nothing can be done because exception is happening
before this.valueDeserializer.deserialize can be called.

Thanks
Sachin


On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy <da...@gmail.com> wrote:

> The suggestions in that FAQ won't help as it is too late, i.e., the message
> has already been received into Streams.
> You could create a simple app that uses the Consumer, seeks to the offset,
> and tries to read the message. If you did this in debug mode you might find
> out some more information.
>
>
> On Thu, 30 Mar 2017 at 11:50 Sachin Mittal <sj...@gmail.com> wrote:
>
> > Well I try to read that offset via kafka-console-consumer.sh too and it
> > fails with same error.
> >
> > So was wondering if I can apply any of the suggestion as per
> >
> > http://docs.confluent.io/3.2.0/streams/faq.html#handling-
> corrupted-records-and-deserialization-errors-poison-pill-messages
> >
> > If there is any other was just to get the contents of that message it
> would
> > be helpful.
> >
> > Thanks
> > Sachin
> >
> >
> > On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy <da...@gmail.com>
> wrote:
> >
> > > Hi Sachin,
> > >
> > > Have you tried firing up a consumer (non-streams), seeking to that
> offset
> > > on the topic and seeing what the message is? Might be easier to debug?
> > Like
> > > you say, it is failing in the consumer.
> > > Thanks,
> > > Damian
> > >
> > > On Thu, 30 Mar 2017 at 10:35 Sachin Mittal <sj...@gmail.com> wrote:
> > >
> > > > I think I am following the third option.
> > > >
> > > > My pipeline is:
> > > >
> > > > serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer ());
> > > >
> > > > builder.stream(Serdes.String(), serde, "advice-stream")
> > > >   .filter(new Predicate<String, V>() { ...})
> > > >   .groupByKey()
> > > >   .aggregate(new Initializer<V1>() {...}, new Aggregator<String, V,
> > V1>()
> > > > {...}, windows, supplier)
> > > >   .mapValues(new ValueMapper<V1, V2>() { ... })
> > > >   .foreach(new ForeachAction<Windowed<String>, V2>() {... });
> > > >
> > > >
> > > > and In VDeserializer (implements Deserializer<V>) I am doing
> something
> > > like
> > > > this:
> > > >
> > > >     public V deserialize(String paramString, byte[]
> paramArrayOfByte) {
> > > >         if (paramArrayOfByte == null) { return null;}
> > > >         V data = null;
> > > >         try {
> > > >             data = objectMapper.readValue(paramArrayOfByte, new
> > > > TypeReference<V>() {});
> > > >         } catch (Exception e) {
> > > >             e.printStackTrace();
> > > >         }
> > > >         return data;
> > > >     }
> > > >
> > > > So I am catching any exception that may happen when deserializing the
> > > data.
> > > >
> > > > This is what third option suggest (if I am not mistaken).
> > > >
> > > > Please let me know given the pipeline we which option would be best
> and
> > > how
> > > > can we incorporate that in our pipeline.
> > > >
> > > > Also not exception is happening when reading from source topic which
> is
> > > > "advice-stream", so looks like flow is not going to pipeline at all
> for
> > > us
> > > > to handle. It is terminating right at consumer poll.
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > > >
> > > > On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll <mi...@confluent.io>
> > > > wrote:
> > > >
> > > > > Could this be a corrupted message ("poison pill") in your topic?
> > > > >
> > > > > If so, take a look at
> > > > > http://docs.confluent.io/current/streams/faq.html#
> > > > >
> > > > handling-corrupted-records-and-deserialization-errors-
> > > poison-pill-messages
> > > > >
> > > > > FYI: We're currently investigating a more elegant way to address
> such
> > > > > poison pill problems.  If you have feedback on that front, feel
> free
> > to
> > > > > share it with us. :-)
> > > > >
> > > > > -Michael
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal <
> sjmittal@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > > This is for first time we are getting a weird exception.
> > > > > > After this the streams caches.
> > > > > >
> > > > > > Only work around is to manually seek and commit offset to a
> greater
> > > > > number
> > > > > > and we are needing this manual intervention again and again.
> > > > > >
> > > > > > Any idea what is causing it and how can we circumvent this.
> > > > > >
> > > > > > Note this error happens in both cases when 10.2 client or 10.1.1
> > > client
> > > > > > connect to kafka server 10.1.1
> > > > > >
> > > > > > So this does not looks like version issue.
> > > > > >
> > > > > > Also we have following setting
> > > > > > message.max.bytes=5000013
> > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576"
> > > > > > ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576"
> > > > > >
> > > > > > Rest is all default and also increasing the value for
> > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not help.
> > > > > >
> > > > > > Stack trace below.
> > > > > >
> > > > > > Thanks
> > > > > > Sachin
> > > > > >
> > > > > >
> > > > > > org.apache.kafka.common.errors.SerializationException: Error
> > > > > deserializing
> > > > > > key/value for partition advice-stream-6 at offset 45153795
> > > > > > java.lang.IllegalArgumentException: null
> > > > > >   at java.nio.Buffer.limit(Buffer.java:275) ~[na:1.8.0_122-ea]
> > > > > >   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.
> > > java:791)
> > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > > > >   at org.apache.kafka.common.record.Record.value(Record.
> java:268)
> > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > > > > > parseRecord(Fetcher.java:867)
> > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > > > > > parseCompletedFetch(Fetcher.java:775)
> ~[kafka-clients-0.10.2.0.jar:
> > > na]
> > > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > > > > > fetchedRecords(Fetcher.java:473) ~[kafka-clients-0.10.2.0.jar:
> na]
> > > > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.
> > > > > > pollOnce(KafkaConsumer.java:1062) ~[kafka-clients-0.10.2.0.jar:
> na]
> > > > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > > > > KafkaConsumer.java:995)
> > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > > > >   at org.apache.kafka.streams.processor.internals.
> > > StreamThread.runLoop(
> > > > > > StreamThread.java:592)
> > > > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > > > > >   at org.apache.kafka.streams.processor.internals.
> > > > > > StreamThread.run(StreamThread.java:378)
> ~[kafka-streams-0.10.2.1-
> > > > > > SNAPSHOT.jar:na]
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: weird SerializationException when consumer is fetching and parsing record in streams application

Posted by Damian Guy <da...@gmail.com>.
The suggestions in that FAQ won't help as it is too late, i.e., the message
has already been received into Streams.
You could create a simple app that uses the Consumer, seeks to the offset,
and tries to read the message. If you did this in debug mode you might find
out some more information.


On Thu, 30 Mar 2017 at 11:50 Sachin Mittal <sj...@gmail.com> wrote:

> Well I try to read that offset via kafka-console-consumer.sh too and it
> fails with same error.
>
> So was wondering if I can apply any of the suggestion as per
>
> http://docs.confluent.io/3.2.0/streams/faq.html#handling-corrupted-records-and-deserialization-errors-poison-pill-messages
>
> If there is any other was just to get the contents of that message it would
> be helpful.
>
> Thanks
> Sachin
>
>
> On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy <da...@gmail.com> wrote:
>
> > Hi Sachin,
> >
> > Have you tried firing up a consumer (non-streams), seeking to that offset
> > on the topic and seeing what the message is? Might be easier to debug?
> Like
> > you say, it is failing in the consumer.
> > Thanks,
> > Damian
> >
> > On Thu, 30 Mar 2017 at 10:35 Sachin Mittal <sj...@gmail.com> wrote:
> >
> > > I think I am following the third option.
> > >
> > > My pipeline is:
> > >
> > > serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer ());
> > >
> > > builder.stream(Serdes.String(), serde, "advice-stream")
> > >   .filter(new Predicate<String, V>() { ...})
> > >   .groupByKey()
> > >   .aggregate(new Initializer<V1>() {...}, new Aggregator<String, V,
> V1>()
> > > {...}, windows, supplier)
> > >   .mapValues(new ValueMapper<V1, V2>() { ... })
> > >   .foreach(new ForeachAction<Windowed<String>, V2>() {... });
> > >
> > >
> > > and In VDeserializer (implements Deserializer<V>) I am doing something
> > like
> > > this:
> > >
> > >     public V deserialize(String paramString, byte[] paramArrayOfByte) {
> > >         if (paramArrayOfByte == null) { return null;}
> > >         V data = null;
> > >         try {
> > >             data = objectMapper.readValue(paramArrayOfByte, new
> > > TypeReference<V>() {});
> > >         } catch (Exception e) {
> > >             e.printStackTrace();
> > >         }
> > >         return data;
> > >     }
> > >
> > > So I am catching any exception that may happen when deserializing the
> > data.
> > >
> > > This is what third option suggest (if I am not mistaken).
> > >
> > > Please let me know given the pipeline we which option would be best and
> > how
> > > can we incorporate that in our pipeline.
> > >
> > > Also not exception is happening when reading from source topic which is
> > > "advice-stream", so looks like flow is not going to pipeline at all for
> > us
> > > to handle. It is terminating right at consumer poll.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > > On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll <mi...@confluent.io>
> > > wrote:
> > >
> > > > Could this be a corrupted message ("poison pill") in your topic?
> > > >
> > > > If so, take a look at
> > > > http://docs.confluent.io/current/streams/faq.html#
> > > >
> > > handling-corrupted-records-and-deserialization-errors-
> > poison-pill-messages
> > > >
> > > > FYI: We're currently investigating a more elegant way to address such
> > > > poison pill problems.  If you have feedback on that front, feel free
> to
> > > > share it with us. :-)
> > > >
> > > > -Michael
> > > >
> > > >
> > > >
> > > >
> > > > On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal <sj...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > > This is for first time we are getting a weird exception.
> > > > > After this the streams caches.
> > > > >
> > > > > Only work around is to manually seek and commit offset to a greater
> > > > number
> > > > > and we are needing this manual intervention again and again.
> > > > >
> > > > > Any idea what is causing it and how can we circumvent this.
> > > > >
> > > > > Note this error happens in both cases when 10.2 client or 10.1.1
> > client
> > > > > connect to kafka server 10.1.1
> > > > >
> > > > > So this does not looks like version issue.
> > > > >
> > > > > Also we have following setting
> > > > > message.max.bytes=5000013
> > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576"
> > > > > ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576"
> > > > >
> > > > > Rest is all default and also increasing the value for
> > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not help.
> > > > >
> > > > > Stack trace below.
> > > > >
> > > > > Thanks
> > > > > Sachin
> > > > >
> > > > >
> > > > > org.apache.kafka.common.errors.SerializationException: Error
> > > > deserializing
> > > > > key/value for partition advice-stream-6 at offset 45153795
> > > > > java.lang.IllegalArgumentException: null
> > > > >   at java.nio.Buffer.limit(Buffer.java:275) ~[na:1.8.0_122-ea]
> > > > >   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.
> > java:791)
> > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > > >   at org.apache.kafka.common.record.Record.value(Record.java:268)
> > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > > > > parseRecord(Fetcher.java:867)
> > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > > > > parseCompletedFetch(Fetcher.java:775) ~[kafka-clients-0.10.2.0.jar:
> > na]
> > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > > > > fetchedRecords(Fetcher.java:473) ~[kafka-clients-0.10.2.0.jar:na]
> > > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.
> > > > > pollOnce(KafkaConsumer.java:1062) ~[kafka-clients-0.10.2.0.jar:na]
> > > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > > > KafkaConsumer.java:995)
> > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > > >   at org.apache.kafka.streams.processor.internals.
> > StreamThread.runLoop(
> > > > > StreamThread.java:592)
> > > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > > > >   at org.apache.kafka.streams.processor.internals.
> > > > > StreamThread.run(StreamThread.java:378) ~[kafka-streams-0.10.2.1-
> > > > > SNAPSHOT.jar:na]
> > > > >
> > > >
> > >
> >
>

Re: weird SerializationException when consumer is fetching and parsing record in streams application

Posted by Sachin Mittal <sj...@gmail.com>.
Well I try to read that offset via kafka-console-consumer.sh too and it
fails with same error.

So was wondering if I can apply any of the suggestion as per
http://docs.confluent.io/3.2.0/streams/faq.html#handling-corrupted-records-and-deserialization-errors-poison-pill-messages

If there is any other was just to get the contents of that message it would
be helpful.

Thanks
Sachin


On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy <da...@gmail.com> wrote:

> Hi Sachin,
>
> Have you tried firing up a consumer (non-streams), seeking to that offset
> on the topic and seeing what the message is? Might be easier to debug? Like
> you say, it is failing in the consumer.
> Thanks,
> Damian
>
> On Thu, 30 Mar 2017 at 10:35 Sachin Mittal <sj...@gmail.com> wrote:
>
> > I think I am following the third option.
> >
> > My pipeline is:
> >
> > serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer ());
> >
> > builder.stream(Serdes.String(), serde, "advice-stream")
> >   .filter(new Predicate<String, V>() { ...})
> >   .groupByKey()
> >   .aggregate(new Initializer<V1>() {...}, new Aggregator<String, V, V1>()
> > {...}, windows, supplier)
> >   .mapValues(new ValueMapper<V1, V2>() { ... })
> >   .foreach(new ForeachAction<Windowed<String>, V2>() {... });
> >
> >
> > and In VDeserializer (implements Deserializer<V>) I am doing something
> like
> > this:
> >
> >     public V deserialize(String paramString, byte[] paramArrayOfByte) {
> >         if (paramArrayOfByte == null) { return null;}
> >         V data = null;
> >         try {
> >             data = objectMapper.readValue(paramArrayOfByte, new
> > TypeReference<V>() {});
> >         } catch (Exception e) {
> >             e.printStackTrace();
> >         }
> >         return data;
> >     }
> >
> > So I am catching any exception that may happen when deserializing the
> data.
> >
> > This is what third option suggest (if I am not mistaken).
> >
> > Please let me know given the pipeline we which option would be best and
> how
> > can we incorporate that in our pipeline.
> >
> > Also not exception is happening when reading from source topic which is
> > "advice-stream", so looks like flow is not going to pipeline at all for
> us
> > to handle. It is terminating right at consumer poll.
> >
> > Thanks
> > Sachin
> >
> >
> > On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll <mi...@confluent.io>
> > wrote:
> >
> > > Could this be a corrupted message ("poison pill") in your topic?
> > >
> > > If so, take a look at
> > > http://docs.confluent.io/current/streams/faq.html#
> > >
> > handling-corrupted-records-and-deserialization-errors-
> poison-pill-messages
> > >
> > > FYI: We're currently investigating a more elegant way to address such
> > > poison pill problems.  If you have feedback on that front, feel free to
> > > share it with us. :-)
> > >
> > > -Michael
> > >
> > >
> > >
> > >
> > > On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal <sj...@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > > This is for first time we are getting a weird exception.
> > > > After this the streams caches.
> > > >
> > > > Only work around is to manually seek and commit offset to a greater
> > > number
> > > > and we are needing this manual intervention again and again.
> > > >
> > > > Any idea what is causing it and how can we circumvent this.
> > > >
> > > > Note this error happens in both cases when 10.2 client or 10.1.1
> client
> > > > connect to kafka server 10.1.1
> > > >
> > > > So this does not looks like version issue.
> > > >
> > > > Also we have following setting
> > > > message.max.bytes=5000013
> > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576"
> > > > ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576"
> > > >
> > > > Rest is all default and also increasing the value for
> > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not help.
> > > >
> > > > Stack trace below.
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > > >
> > > > org.apache.kafka.common.errors.SerializationException: Error
> > > deserializing
> > > > key/value for partition advice-stream-6 at offset 45153795
> > > > java.lang.IllegalArgumentException: null
> > > >   at java.nio.Buffer.limit(Buffer.java:275) ~[na:1.8.0_122-ea]
> > > >   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.
> java:791)
> > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > >   at org.apache.kafka.common.record.Record.value(Record.java:268)
> > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > > > parseRecord(Fetcher.java:867)
> > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > > > parseCompletedFetch(Fetcher.java:775) ~[kafka-clients-0.10.2.0.jar:
> na]
> > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > > > fetchedRecords(Fetcher.java:473) ~[kafka-clients-0.10.2.0.jar:na]
> > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.
> > > > pollOnce(KafkaConsumer.java:1062) ~[kafka-clients-0.10.2.0.jar:na]
> > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > > KafkaConsumer.java:995)
> > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > >   at org.apache.kafka.streams.processor.internals.
> StreamThread.runLoop(
> > > > StreamThread.java:592)
> > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > > >   at org.apache.kafka.streams.processor.internals.
> > > > StreamThread.run(StreamThread.java:378) ~[kafka-streams-0.10.2.1-
> > > > SNAPSHOT.jar:na]
> > > >
> > >
> >
>

Re: weird SerializationException when consumer is fetching and parsing record in streams application

Posted by Damian Guy <da...@gmail.com>.
Hi Sachin,

Have you tried firing up a consumer (non-streams), seeking to that offset
on the topic and seeing what the message is? Might be easier to debug? Like
you say, it is failing in the consumer.
Thanks,
Damian

On Thu, 30 Mar 2017 at 10:35 Sachin Mittal <sj...@gmail.com> wrote:

> I think I am following the third option.
>
> My pipeline is:
>
> serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer ());
>
> builder.stream(Serdes.String(), serde, "advice-stream")
>   .filter(new Predicate<String, V>() { ...})
>   .groupByKey()
>   .aggregate(new Initializer<V1>() {...}, new Aggregator<String, V, V1>()
> {...}, windows, supplier)
>   .mapValues(new ValueMapper<V1, V2>() { ... })
>   .foreach(new ForeachAction<Windowed<String>, V2>() {... });
>
>
> and In VDeserializer (implements Deserializer<V>) I am doing something like
> this:
>
>     public V deserialize(String paramString, byte[] paramArrayOfByte) {
>         if (paramArrayOfByte == null) { return null;}
>         V data = null;
>         try {
>             data = objectMapper.readValue(paramArrayOfByte, new
> TypeReference<V>() {});
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
>         return data;
>     }
>
> So I am catching any exception that may happen when deserializing the data.
>
> This is what third option suggest (if I am not mistaken).
>
> Please let me know given the pipeline we which option would be best and how
> can we incorporate that in our pipeline.
>
> Also not exception is happening when reading from source topic which is
> "advice-stream", so looks like flow is not going to pipeline at all for us
> to handle. It is terminating right at consumer poll.
>
> Thanks
> Sachin
>
>
> On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll <mi...@confluent.io>
> wrote:
>
> > Could this be a corrupted message ("poison pill") in your topic?
> >
> > If so, take a look at
> > http://docs.confluent.io/current/streams/faq.html#
> >
> handling-corrupted-records-and-deserialization-errors-poison-pill-messages
> >
> > FYI: We're currently investigating a more elegant way to address such
> > poison pill problems.  If you have feedback on that front, feel free to
> > share it with us. :-)
> >
> > -Michael
> >
> >
> >
> >
> > On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal <sj...@gmail.com>
> > wrote:
> >
> > > Hi,
> > > This is for first time we are getting a weird exception.
> > > After this the streams caches.
> > >
> > > Only work around is to manually seek and commit offset to a greater
> > number
> > > and we are needing this manual intervention again and again.
> > >
> > > Any idea what is causing it and how can we circumvent this.
> > >
> > > Note this error happens in both cases when 10.2 client or 10.1.1 client
> > > connect to kafka server 10.1.1
> > >
> > > So this does not looks like version issue.
> > >
> > > Also we have following setting
> > > message.max.bytes=5000013
> > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576"
> > > ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576"
> > >
> > > Rest is all default and also increasing the value for
> > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not help.
> > >
> > > Stack trace below.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > > org.apache.kafka.common.errors.SerializationException: Error
> > deserializing
> > > key/value for partition advice-stream-6 at offset 45153795
> > > java.lang.IllegalArgumentException: null
> > >   at java.nio.Buffer.limit(Buffer.java:275) ~[na:1.8.0_122-ea]
> > >   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
> > > ~[kafka-clients-0.10.2.0.jar:na]
> > >   at org.apache.kafka.common.record.Record.value(Record.java:268)
> > > ~[kafka-clients-0.10.2.0.jar:na]
> > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > > parseRecord(Fetcher.java:867)
> > > ~[kafka-clients-0.10.2.0.jar:na]
> > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > > parseCompletedFetch(Fetcher.java:775) ~[kafka-clients-0.10.2.0.jar:na]
> > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > > fetchedRecords(Fetcher.java:473) ~[kafka-clients-0.10.2.0.jar:na]
> > >   at org.apache.kafka.clients.consumer.KafkaConsumer.
> > > pollOnce(KafkaConsumer.java:1062) ~[kafka-clients-0.10.2.0.jar:na]
> > >   at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > KafkaConsumer.java:995)
> > > ~[kafka-clients-0.10.2.0.jar:na]
> > >   at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > StreamThread.java:592)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > >   at org.apache.kafka.streams.processor.internals.
> > > StreamThread.run(StreamThread.java:378) ~[kafka-streams-0.10.2.1-
> > > SNAPSHOT.jar:na]
> > >
> >
>

Re: weird SerializationException when consumer is fetching and parsing record in streams application

Posted by Sachin Mittal <sj...@gmail.com>.
I think I am following the third option.

My pipeline is:

serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer ());

builder.stream(Serdes.String(), serde, "advice-stream")
  .filter(new Predicate<String, V>() { ...})
  .groupByKey()
  .aggregate(new Initializer<V1>() {...}, new Aggregator<String, V, V1>()
{...}, windows, supplier)
  .mapValues(new ValueMapper<V1, V2>() { ... })
  .foreach(new ForeachAction<Windowed<String>, V2>() {... });


and In VDeserializer (implements Deserializer<V>) I am doing something like
this:

    public V deserialize(String paramString, byte[] paramArrayOfByte) {
        if (paramArrayOfByte == null) { return null;}
        V data = null;
        try {
            data = objectMapper.readValue(paramArrayOfByte, new
TypeReference<V>() {});
        } catch (Exception e) {
            e.printStackTrace();
        }
        return data;
    }

So I am catching any exception that may happen when deserializing the data.

This is what third option suggest (if I am not mistaken).

Please let me know given the pipeline we which option would be best and how
can we incorporate that in our pipeline.

Also not exception is happening when reading from source topic which is
"advice-stream", so looks like flow is not going to pipeline at all for us
to handle. It is terminating right at consumer poll.

Thanks
Sachin


On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll <mi...@confluent.io> wrote:

> Could this be a corrupted message ("poison pill") in your topic?
>
> If so, take a look at
> http://docs.confluent.io/current/streams/faq.html#
> handling-corrupted-records-and-deserialization-errors-poison-pill-messages
>
> FYI: We're currently investigating a more elegant way to address such
> poison pill problems.  If you have feedback on that front, feel free to
> share it with us. :-)
>
> -Michael
>
>
>
>
> On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal <sj...@gmail.com>
> wrote:
>
> > Hi,
> > This is for first time we are getting a weird exception.
> > After this the streams caches.
> >
> > Only work around is to manually seek and commit offset to a greater
> number
> > and we are needing this manual intervention again and again.
> >
> > Any idea what is causing it and how can we circumvent this.
> >
> > Note this error happens in both cases when 10.2 client or 10.1.1 client
> > connect to kafka server 10.1.1
> >
> > So this does not looks like version issue.
> >
> > Also we have following setting
> > message.max.bytes=5000013
> > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576"
> > ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576"
> >
> > Rest is all default and also increasing the value for
> > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not help.
> >
> > Stack trace below.
> >
> > Thanks
> > Sachin
> >
> >
> > org.apache.kafka.common.errors.SerializationException: Error
> deserializing
> > key/value for partition advice-stream-6 at offset 45153795
> > java.lang.IllegalArgumentException: null
> >   at java.nio.Buffer.limit(Buffer.java:275) ~[na:1.8.0_122-ea]
> >   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
> > ~[kafka-clients-0.10.2.0.jar:na]
> >   at org.apache.kafka.common.record.Record.value(Record.java:268)
> > ~[kafka-clients-0.10.2.0.jar:na]
> >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > parseRecord(Fetcher.java:867)
> > ~[kafka-clients-0.10.2.0.jar:na]
> >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > parseCompletedFetch(Fetcher.java:775) ~[kafka-clients-0.10.2.0.jar:na]
> >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > fetchedRecords(Fetcher.java:473) ~[kafka-clients-0.10.2.0.jar:na]
> >   at org.apache.kafka.clients.consumer.KafkaConsumer.
> > pollOnce(KafkaConsumer.java:1062) ~[kafka-clients-0.10.2.0.jar:na]
> >   at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > KafkaConsumer.java:995)
> > ~[kafka-clients-0.10.2.0.jar:na]
> >   at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:592)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >   at org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:378) ~[kafka-streams-0.10.2.1-
> > SNAPSHOT.jar:na]
> >
>

Re: weird SerializationException when consumer is fetching and parsing record in streams application

Posted by Michael Noll <mi...@confluent.io>.
Could this be a corrupted message ("poison pill") in your topic?

If so, take a look at
http://docs.confluent.io/current/streams/faq.html#handling-corrupted-records-and-deserialization-errors-poison-pill-messages

FYI: We're currently investigating a more elegant way to address such
poison pill problems.  If you have feedback on that front, feel free to
share it with us. :-)

-Michael




On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal <sj...@gmail.com> wrote:

> Hi,
> This is for first time we are getting a weird exception.
> After this the streams caches.
>
> Only work around is to manually seek and commit offset to a greater number
> and we are needing this manual intervention again and again.
>
> Any idea what is causing it and how can we circumvent this.
>
> Note this error happens in both cases when 10.2 client or 10.1.1 client
> connect to kafka server 10.1.1
>
> So this does not looks like version issue.
>
> Also we have following setting
> message.max.bytes=5000013
> ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576"
> ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576"
>
> Rest is all default and also increasing the value for
> ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not help.
>
> Stack trace below.
>
> Thanks
> Sachin
>
>
> org.apache.kafka.common.errors.SerializationException: Error deserializing
> key/value for partition advice-stream-6 at offset 45153795
> java.lang.IllegalArgumentException: null
>   at java.nio.Buffer.limit(Buffer.java:275) ~[na:1.8.0_122-ea]
>   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:791)
> ~[kafka-clients-0.10.2.0.jar:na]
>   at org.apache.kafka.common.record.Record.value(Record.java:268)
> ~[kafka-clients-0.10.2.0.jar:na]
>   at org.apache.kafka.clients.consumer.internals.Fetcher.
> parseRecord(Fetcher.java:867)
> ~[kafka-clients-0.10.2.0.jar:na]
>   at org.apache.kafka.clients.consumer.internals.Fetcher.
> parseCompletedFetch(Fetcher.java:775) ~[kafka-clients-0.10.2.0.jar:na]
>   at org.apache.kafka.clients.consumer.internals.Fetcher.
> fetchedRecords(Fetcher.java:473) ~[kafka-clients-0.10.2.0.jar:na]
>   at org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1062) ~[kafka-clients-0.10.2.0.jar:na]
>   at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:995)
> ~[kafka-clients-0.10.2.0.jar:na]
>   at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:592)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>   at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:378) ~[kafka-streams-0.10.2.1-
> SNAPSHOT.jar:na]
>