You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Pushkar Deole <pd...@gmail.com> on 2020/06/18 07:54:51 UTC

kafka consumer thread crashes and doesn't consume any events without service restart

Hi All,

This is what I am observing: we have a consumer which polls data from
topic, does the processing, again polls data which keeps happening
continuously.
At one time, there was some bad data on the topic which could not be
consumed by consumer, probably because it couldn't deserialize the event
due to incompatible avro schema or something similar,
and consumer got error deserializing event. Since the exception wasn't
handled, it crashed the consumer thread which then stopped consuming data.

The question here is how these kind of scenarios can be handled:
1. Even if I catch the exception and log it, the consumer will i think
process the next event. So the bad event will be lost
2. When consumer goes for another poll, it would commit offsets of previous
poll which includes bad event, So the event will be lost

How can this scenario be handled in best possible way?

Re: kafka consumer thread crashes and doesn't consume any events without service restart

Posted by Ricardo Ferreira <ri...@riferrei.com>.
Pushkar,

You are not wrong. Indeed whatever deserialization errors that happens 
during the poll() method will cause your code to be interrupted without 
much information about which offset failed. A workaround would be trying 
to parse the message contained in the exception SerializationException 
and try to recover. But this is too pushy.

Taking a more closer look in the stack trace that you shared, it seems 
that the real problem might be connectivity with Schema Registry. Hence 
why the last mile of your exception says that there is a 'Connection 
Refused' in place.

```

Caused by: java.net.ConnectException: Connection refused (Connection
refused)
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
at java.base/java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(Unknown
Source)
at java.base/java.net.AbstractPlainSocketImpl.connect(Unknown Source)
at java.base/java.net.Socket.connect(Unknown Source)
at java.base/sun.net.NetworkClient.doConnect(Unknown Source)
at java.base/sun.net.www.http.HttpClient.openServer(Unknown Source)
at java.base/sun.net.www.http.HttpClient.openServer(Unknown Source)
at java.base/sun.net.www.http.HttpClient.<init>(Unknown Source)
at java.base/sun.net.www.http.HttpClient.New(Unknown Source)
at java.base/sun.net.www.http.HttpClient.New(Unknown Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(Unknown
Source)
at java.base/sun.net.www.protocol.http.HttpURLConnection.connect(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(Unknown
Source)
at java.base/java.net.HttpURLConnection.getResponseCode(Unknown Source)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208) 
*<------------- [5] Here, way after everything it tries to connect to 
the service but fails.*
at
io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:153)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:232)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:211) 
*<------------- [4] This is the part where the AvroDeserializer tries to 
contact Schema Registry to fetch the Schema*
at
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116)
at
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88) 
*<------------- [3] So far so good. No major deserialization errors*
at
io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
at
org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at
org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1268) 
*<------------- [2] Up to this point the record is read*
at
org.apache.kafka.clients.consumer.internals.Fetcher.access$3600(Fetcher.java:124)
at
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1492)
at
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332)
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645)
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1294)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) 
*<------------- [1] Things start here*

```

Thanks,

-- Ricardo

On 6/18/20 10:08 AM, Pushkar Deole wrote:
> Hi Ricardo,
>
> Probably this is more complicated than that since the exception has 
> occurred during Consumer.poll itself, so there is no ConsumerRecord 
> for the application to process and hence the application doesn't know 
> the offset of record where the poll has failed.
>
> On Thu, Jun 18, 2020 at 7:03 PM Ricardo Ferreira 
> <riferrei@riferrei.com <ma...@riferrei.com>> wrote:
>
>     Pushkar,
>
>     Kafka uses the concept of offsets to identify the order of each
>     record within the log. But this concept is more powerful than it
>     looks like. Committed offsets are also used to keep track of which
>     records has been successfully read and which ones are not. When
>     you commit a offset in the consumer; a message is sent to Kafka
>     that in turn register this commit into a internal topic called
>     `__committed_offsets`.
>
>     Point being: you can elegantly solve this problem by handling
>     properly the exception in your code but only committing the offset
>     if the record was deemed fully read -- which means being able to
>     deserialize the record with no exceptions thrown. In order to do
>     this, you will need to disable auto commit and manually commit the
>     offsets either in a per-batch basis or in a per-record basis.
>
>     Non-committed offsets will be picked up by the same or another
>     thread from the consumer group. This is the part where
>     *Gerbrand's* suggestion might take place. You might want to have
>     another stream processor specifically handling those outliers and
>     sending them out to a DLQ topic for manual reprocessing purposes.
>
>     Thanks,
>
>     -- Ricardo
>
>     On 6/18/20 7:45 AM, Pushkar Deole wrote:
>>     Hi Gerbrand,
>>
>>     thanks for the update, however if i dig more into it, the issue is because
>>     of schema registry issue and the schema registry not accessible. So the
>>     error is coming during poll operation itself:
>>     So this is a not a bad event really but the event can't be deserialized
>>     itself due to schema not available. Even if this record is skipped, the
>>     next record will meet the same error.
>>
>>     Exception in thread "Thread-9"
>>     org.apache.kafka.common.errors.SerializationException: Error deserializing
>>     key/value for partition tenant.avro-2 at offset 1. If needed, please seek
>>     past the record to continue consumption.
>>     Caused by: org.apache.kafka.common.errors.SerializationException: Error
>>     deserializing Avro message for id 93
>>     Caused by: java.net.ConnectException: Connection refused (Connection
>>     refused)
>>     at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
>>     at java.base/java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
>>     at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(Unknown
>>     Source)
>>     at java.base/java.net.AbstractPlainSocketImpl.connect(Unknown Source)
>>     at java.base/java.net.Socket.connect(Unknown Source)
>>     at java.base/sun.net.NetworkClient.doConnect(Unknown Source)
>>     at java.base/sun.net.www.http.HttpClient.openServer(Unknown Source)
>>     at java.base/sun.net.www.http.HttpClient.openServer(Unknown Source)
>>     at java.base/sun.net.www.http.HttpClient.<init>(Unknown Source)
>>     at java.base/sun.net.www.http.HttpClient.New(Unknown Source)
>>     at java.base/sun.net.www.http.HttpClient.New(Unknown Source)
>>     at
>>     java.base/sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(Unknown
>>     Source)
>>     at
>>     java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(Unknown
>>     Source)
>>     at
>>     java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(Unknown
>>     Source)
>>     at java.base/sun.net.www.protocol.http.HttpURLConnection.connect(Unknown
>>     Source)
>>     at
>>     java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(Unknown
>>     Source)
>>     at
>>     java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(Unknown
>>     Source)
>>     at java.base/java.net.HttpURLConnection.getResponseCode(Unknown Source)
>>     at
>>     io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208)
>>     at
>>     io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
>>     at
>>     io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482)
>>     at
>>     io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475)
>>     at
>>     io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:153)
>>     at
>>     io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:232)
>>     at
>>     io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:211)
>>     at
>>     io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116)
>>     at
>>     io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88)
>>     at
>>     io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
>>     at
>>     org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
>>     at
>>     org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1268)
>>     at
>>     org.apache.kafka.clients.consumer.internals.Fetcher.access$3600(Fetcher.java:124)
>>     at
>>     org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1492)
>>     at
>>     org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332)
>>     at
>>     org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645)
>>     at
>>     org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606)
>>     at
>>     org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1294)
>>     at
>>     org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
>>     at
>>     org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
>>     at
>>     com.avaya.analytics.dsi.DsiConsumer.runAdminConsumer(DsiConsumer.java:797)
>>     at java.base/java.lang.Thread.run(Unknown Source)
>>
>>     On Thu, Jun 18, 2020 at 3:17 PM Gerbrand van Dieijen<ge...@vandieijen.nl>  <ma...@vandieijen.nl>
>>     wrote:
>>
>>>     Hello Pushkar,
>>>
>>>
>>>     I'd split records/events in categories based on the error:
>>>     - Events that can be parsed or otherwise handled correctly, e.g. good
>>>     events
>>>     - Fatal error, like parsing error, empty or incorrect values, etc., e.g.
>>>     bad events
>>>     - Non-fatal, like database-connection failure, io-failure, out of memory,
>>>     and others
>>>        that could be retried
>>>
>>>     Best to avoid doing something blocking while handling the error, so create
>>>     a separate stream for each. That way 'good' events don't have to wait for
>>>     the handling of 'bad' events.
>>>
>>>     Any fatal can events you could store in a separate topic, or send to some
>>>     monitoring/logging system. As a simple start you could sent the erroneous
>>>     events to a separate topic named something like 'errorevents'.
>>>     Any non-fatal errors could be retried. Last time I used Akka for that (
>>>     https://doc.akka.io/docs/alpakka-kafka/current/errorhandling.html) but
>>>     afaik KStreams has mechanism for that as well.  You could also store
>>>     records that you want to retry into a separate topic 'retry'.
>>>     Do not store records that that you want to retry back into the original
>>>     topic! If you do that you're have a great risk that overload you're whole
>>>     kafka-cluster.
>>>
>>>     Op 18-06-2020 09:55 heeft Pushkar Deole<pd...@gmail.com>  <ma...@gmail.com>
>>>     geschreven:
>>>
>>>          Hi All,
>>>
>>>          This is what I am observing: we have a consumer which polls data from
>>>          topic, does the processing, again polls data which keeps happening
>>>          continuously.
>>>          At one time, there was some bad data on the topic which could not be
>>>          consumed by consumer, probably because it couldn't deserialize the
>>>     event
>>>          due to incompatible avro schema or something similar,
>>>          and consumer got error deserializing event. Since the exception wasn't
>>>          handled, it crashed the consumer thread which then stopped consuming
>>>     data.
>>>
>>>          The question here is how these kind of scenarios can be handled:
>>>          1. Even if I catch the exception and log it, the consumer will i think
>>>          process the next event. So the bad event will be lost
>>>          2. When consumer goes for another poll, it would commit offsets of
>>>     previous
>>>          poll which includes bad event, So the event will be lost
>>>
>>>          How can this scenario be handled in best possible way?
>>>

Re: kafka consumer thread crashes and doesn't consume any events without service restart

Posted by Pushkar Deole <pd...@gmail.com>.
Hi Ricardo,

Probably this is more complicated than that since the exception has
occurred during Consumer.poll itself, so there is no ConsumerRecord for the
application to process and hence the application doesn't know the offset of
record where the poll has failed.

On Thu, Jun 18, 2020 at 7:03 PM Ricardo Ferreira <ri...@riferrei.com>
wrote:

> Pushkar,
>
> Kafka uses the concept of offsets to identify the order of each record
> within the log. But this concept is more powerful than it looks like.
> Committed offsets are also used to keep track of which records has been
> successfully read and which ones are not. When you commit a offset in the
> consumer; a message is sent to Kafka that in turn register this commit into
> a internal topic called `__committed_offsets`.
>
> Point being: you can elegantly solve this problem by handling properly the
> exception in your code but only committing the offset if the record was
> deemed fully read -- which means being able to deserialize the record with
> no exceptions thrown. In order to do this, you will need to disable auto
> commit and manually commit the offsets either in a per-batch basis or in a
> per-record basis.
>
> Non-committed offsets will be picked up by the same or another thread from
> the consumer group. This is the part where *Gerbrand's* suggestion might
> take place. You might want to have another stream processor specifically
> handling those outliers and sending them out to a DLQ topic for manual
> reprocessing purposes.
>
> Thanks,
>
> -- Ricardo
> On 6/18/20 7:45 AM, Pushkar Deole wrote:
>
> Hi Gerbrand,
>
> thanks for the update, however if i dig more into it, the issue is because
> of schema registry issue and the schema registry not accessible. So the
> error is coming during poll operation itself:
> So this is a not a bad event really but the event can't be deserialized
> itself due to schema not available. Even if this record is skipped, the
> next record will meet the same error.
>
> Exception in thread "Thread-9"
> org.apache.kafka.common.errors.SerializationException: Error deserializing
> key/value for partition tenant.avro-2 at offset 1. If needed, please seek
> past the record to continue consumption.
> Caused by: org.apache.kafka.common.errors.SerializationException: Error
> deserializing Avro message for id 93
> Caused by: java.net.ConnectException: Connection refused (Connection
> refused)
> at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
> at java.base/java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
> at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(Unknown
> Source)
> at java.base/java.net.AbstractPlainSocketImpl.connect(Unknown Source)
> at java.base/java.net.Socket.connect(Unknown Source)
> at java.base/sun.net.NetworkClient.doConnect(Unknown Source)
> at java.base/sun.net.www.http.HttpClient.openServer(Unknown Source)
> at java.base/sun.net.www.http.HttpClient.openServer(Unknown Source)
> at java.base/sun.net.www.http.HttpClient.<init>(Unknown Source)
> at java.base/sun.net.www.http.HttpClient.New(Unknown Source)
> at java.base/sun.net.www.http.HttpClient.New(Unknown Source)
> at
> java.base/sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(Unknown
> Source)
> at
> java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(Unknown
> Source)
> at
> java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(Unknown
> Source)
> at java.base/sun.net.www.protocol.http.HttpURLConnection.connect(Unknown
> Source)
> at
> java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(Unknown
> Source)
> at
> java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(Unknown
> Source)
> at java.base/java.net.HttpURLConnection.getResponseCode(Unknown Source)
> at
> io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208)
> at
> io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
> at
> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482)
> at
> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475)
> at
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:153)
> at
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:232)
> at
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:211)
> at
> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116)
> at
> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88)
> at
> io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
> at
> org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1268)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.access$3600(Fetcher.java:124)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1492)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1294)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
> at
> com.avaya.analytics.dsi.DsiConsumer.runAdminConsumer(DsiConsumer.java:797)
> at java.base/java.lang.Thread.run(Unknown Source)
>
> On Thu, Jun 18, 2020 at 3:17 PM Gerbrand van Dieijen <ge...@vandieijen.nl> <ge...@vandieijen.nl>
> wrote:
>
>
> Hello Pushkar,
>
>
> I'd split records/events in categories based on the error:
> - Events that can be parsed or otherwise handled correctly, e.g. good
> events
> - Fatal error, like parsing error, empty or incorrect values, etc., e.g.
> bad events
> - Non-fatal, like database-connection failure, io-failure, out of memory,
> and others
>   that could be retried
>
> Best to avoid doing something blocking while handling the error, so create
> a separate stream for each. That way 'good' events don't have to wait for
> the handling of 'bad' events.
>
> Any fatal can events you could store in a separate topic, or send to some
> monitoring/logging system. As a simple start you could sent the erroneous
> events to a separate topic named something like 'errorevents'.
> Any non-fatal errors could be retried. Last time I used Akka for that (https://doc.akka.io/docs/alpakka-kafka/current/errorhandling.html) but
> afaik KStreams has mechanism for that as well.  You could also store
> records that you want to retry into a separate topic 'retry'.
> Do not store records that that you want to retry back into the original
> topic! If you do that you're have a great risk that overload you're whole
> kafka-cluster.
>
> Op 18-06-2020 09:55 heeft Pushkar Deole <pd...@gmail.com> <pd...@gmail.com>
> geschreven:
>
>     Hi All,
>
>     This is what I am observing: we have a consumer which polls data from
>     topic, does the processing, again polls data which keeps happening
>     continuously.
>     At one time, there was some bad data on the topic which could not be
>     consumed by consumer, probably because it couldn't deserialize the
> event
>     due to incompatible avro schema or something similar,
>     and consumer got error deserializing event. Since the exception wasn't
>     handled, it crashed the consumer thread which then stopped consuming
> data.
>
>     The question here is how these kind of scenarios can be handled:
>     1. Even if I catch the exception and log it, the consumer will i think
>     process the next event. So the bad event will be lost
>     2. When consumer goes for another poll, it would commit offsets of
> previous
>     poll which includes bad event, So the event will be lost
>
>     How can this scenario be handled in best possible way?
>
>
>

Re: kafka consumer thread crashes and doesn't consume any events without service restart

Posted by Ricardo Ferreira <ri...@riferrei.com>.
Pushkar,

Kafka uses the concept of offsets to identify the order of each record 
within the log. But this concept is more powerful than it looks like. 
Committed offsets are also used to keep track of which records has been 
successfully read and which ones are not. When you commit a offset in 
the consumer; a message is sent to Kafka that in turn register this 
commit into a internal topic called `__committed_offsets`.

Point being: you can elegantly solve this problem by handling properly 
the exception in your code but only committing the offset if the record 
was deemed fully read -- which means being able to deserialize the 
record with no exceptions thrown. In order to do this, you will need to 
disable auto commit and manually commit the offsets either in a 
per-batch basis or in a per-record basis.

Non-committed offsets will be picked up by the same or another thread 
from the consumer group. This is the part where *Gerbrand's* suggestion 
might take place. You might want to have another stream processor 
specifically handling those outliers and sending them out to a DLQ topic 
for manual reprocessing purposes.

Thanks,

-- Ricardo

On 6/18/20 7:45 AM, Pushkar Deole wrote:
> Hi Gerbrand,
>
> thanks for the update, however if i dig more into it, the issue is because
> of schema registry issue and the schema registry not accessible. So the
> error is coming during poll operation itself:
> So this is a not a bad event really but the event can't be deserialized
> itself due to schema not available. Even if this record is skipped, the
> next record will meet the same error.
>
> Exception in thread "Thread-9"
> org.apache.kafka.common.errors.SerializationException: Error deserializing
> key/value for partition tenant.avro-2 at offset 1. If needed, please seek
> past the record to continue consumption.
> Caused by: org.apache.kafka.common.errors.SerializationException: Error
> deserializing Avro message for id 93
> Caused by: java.net.ConnectException: Connection refused (Connection
> refused)
> at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
> at java.base/java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
> at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(Unknown
> Source)
> at java.base/java.net.AbstractPlainSocketImpl.connect(Unknown Source)
> at java.base/java.net.Socket.connect(Unknown Source)
> at java.base/sun.net.NetworkClient.doConnect(Unknown Source)
> at java.base/sun.net.www.http.HttpClient.openServer(Unknown Source)
> at java.base/sun.net.www.http.HttpClient.openServer(Unknown Source)
> at java.base/sun.net.www.http.HttpClient.<init>(Unknown Source)
> at java.base/sun.net.www.http.HttpClient.New(Unknown Source)
> at java.base/sun.net.www.http.HttpClient.New(Unknown Source)
> at
> java.base/sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(Unknown
> Source)
> at
> java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(Unknown
> Source)
> at
> java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(Unknown
> Source)
> at java.base/sun.net.www.protocol.http.HttpURLConnection.connect(Unknown
> Source)
> at
> java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(Unknown
> Source)
> at
> java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(Unknown
> Source)
> at java.base/java.net.HttpURLConnection.getResponseCode(Unknown Source)
> at
> io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208)
> at
> io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
> at
> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482)
> at
> io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475)
> at
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:153)
> at
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:232)
> at
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:211)
> at
> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116)
> at
> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88)
> at
> io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
> at
> org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1268)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.access$3600(Fetcher.java:124)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1492)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1294)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
> at
> com.avaya.analytics.dsi.DsiConsumer.runAdminConsumer(DsiConsumer.java:797)
> at java.base/java.lang.Thread.run(Unknown Source)
>
> On Thu, Jun 18, 2020 at 3:17 PM Gerbrand van Dieijen <ge...@vandieijen.nl>
> wrote:
>
>> Hello Pushkar,
>>
>>
>> I'd split records/events in categories based on the error:
>> - Events that can be parsed or otherwise handled correctly, e.g. good
>> events
>> - Fatal error, like parsing error, empty or incorrect values, etc., e.g.
>> bad events
>> - Non-fatal, like database-connection failure, io-failure, out of memory,
>> and others
>>    that could be retried
>>
>> Best to avoid doing something blocking while handling the error, so create
>> a separate stream for each. That way 'good' events don't have to wait for
>> the handling of 'bad' events.
>>
>> Any fatal can events you could store in a separate topic, or send to some
>> monitoring/logging system. As a simple start you could sent the erroneous
>> events to a separate topic named something like 'errorevents'.
>> Any non-fatal errors could be retried. Last time I used Akka for that (
>> https://doc.akka.io/docs/alpakka-kafka/current/errorhandling.html) but
>> afaik KStreams has mechanism for that as well.  You could also store
>> records that you want to retry into a separate topic 'retry'.
>> Do not store records that that you want to retry back into the original
>> topic! If you do that you're have a great risk that overload you're whole
>> kafka-cluster.
>>
>> Op 18-06-2020 09:55 heeft Pushkar Deole <pd...@gmail.com>
>> geschreven:
>>
>>      Hi All,
>>
>>      This is what I am observing: we have a consumer which polls data from
>>      topic, does the processing, again polls data which keeps happening
>>      continuously.
>>      At one time, there was some bad data on the topic which could not be
>>      consumed by consumer, probably because it couldn't deserialize the
>> event
>>      due to incompatible avro schema or something similar,
>>      and consumer got error deserializing event. Since the exception wasn't
>>      handled, it crashed the consumer thread which then stopped consuming
>> data.
>>
>>      The question here is how these kind of scenarios can be handled:
>>      1. Even if I catch the exception and log it, the consumer will i think
>>      process the next event. So the bad event will be lost
>>      2. When consumer goes for another poll, it would commit offsets of
>> previous
>>      poll which includes bad event, So the event will be lost
>>
>>      How can this scenario be handled in best possible way?
>>

Re: kafka consumer thread crashes and doesn't consume any events without service restart

Posted by Pushkar Deole <pd...@gmail.com>.
Hi Gerbrand,

thanks for the update, however if i dig more into it, the issue is because
of schema registry issue and the schema registry not accessible. So the
error is coming during poll operation itself:
So this is a not a bad event really but the event can't be deserialized
itself due to schema not available. Even if this record is skipped, the
next record will meet the same error.

Exception in thread "Thread-9"
org.apache.kafka.common.errors.SerializationException: Error deserializing
key/value for partition tenant.avro-2 at offset 1. If needed, please seek
past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error
deserializing Avro message for id 93
Caused by: java.net.ConnectException: Connection refused (Connection
refused)
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
at java.base/java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(Unknown
Source)
at java.base/java.net.AbstractPlainSocketImpl.connect(Unknown Source)
at java.base/java.net.Socket.connect(Unknown Source)
at java.base/sun.net.NetworkClient.doConnect(Unknown Source)
at java.base/sun.net.www.http.HttpClient.openServer(Unknown Source)
at java.base/sun.net.www.http.HttpClient.openServer(Unknown Source)
at java.base/sun.net.www.http.HttpClient.<init>(Unknown Source)
at java.base/sun.net.www.http.HttpClient.New(Unknown Source)
at java.base/sun.net.www.http.HttpClient.New(Unknown Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(Unknown
Source)
at java.base/sun.net.www.protocol.http.HttpURLConnection.connect(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(Unknown
Source)
at java.base/java.net.HttpURLConnection.getResponseCode(Unknown Source)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:153)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:232)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:211)
at
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116)
at
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88)
at
io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
at
org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at
org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1268)
at
org.apache.kafka.clients.consumer.internals.Fetcher.access$3600(Fetcher.java:124)
at
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1492)
at
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332)
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645)
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1294)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
at
com.avaya.analytics.dsi.DsiConsumer.runAdminConsumer(DsiConsumer.java:797)
at java.base/java.lang.Thread.run(Unknown Source)

On Thu, Jun 18, 2020 at 3:17 PM Gerbrand van Dieijen <ge...@vandieijen.nl>
wrote:

> Hello Pushkar,
>
>
> I'd split records/events in categories based on the error:
> - Events that can be parsed or otherwise handled correctly, e.g. good
> events
> - Fatal error, like parsing error, empty or incorrect values, etc., e.g.
> bad events
> - Non-fatal, like database-connection failure, io-failure, out of memory,
> and others
>   that could be retried
>
> Best to avoid doing something blocking while handling the error, so create
> a separate stream for each. That way 'good' events don't have to wait for
> the handling of 'bad' events.
>
> Any fatal can events you could store in a separate topic, or send to some
> monitoring/logging system. As a simple start you could sent the erroneous
> events to a separate topic named something like 'errorevents'.
> Any non-fatal errors could be retried. Last time I used Akka for that (
> https://doc.akka.io/docs/alpakka-kafka/current/errorhandling.html) but
> afaik KStreams has mechanism for that as well.  You could also store
> records that you want to retry into a separate topic 'retry'.
> Do not store records that that you want to retry back into the original
> topic! If you do that you're have a great risk that overload you're whole
> kafka-cluster.
>
> Op 18-06-2020 09:55 heeft Pushkar Deole <pd...@gmail.com>
> geschreven:
>
>     Hi All,
>
>     This is what I am observing: we have a consumer which polls data from
>     topic, does the processing, again polls data which keeps happening
>     continuously.
>     At one time, there was some bad data on the topic which could not be
>     consumed by consumer, probably because it couldn't deserialize the
> event
>     due to incompatible avro schema or something similar,
>     and consumer got error deserializing event. Since the exception wasn't
>     handled, it crashed the consumer thread which then stopped consuming
> data.
>
>     The question here is how these kind of scenarios can be handled:
>     1. Even if I catch the exception and log it, the consumer will i think
>     process the next event. So the bad event will be lost
>     2. When consumer goes for another poll, it would commit offsets of
> previous
>     poll which includes bad event, So the event will be lost
>
>     How can this scenario be handled in best possible way?
>

Re: kafka consumer thread crashes and doesn't consume any events without service restart

Posted by Gerbrand van Dieijen <ge...@vandieijen.nl>.
Hello Pushkar,


I'd split records/events in categories based on the error:
- Events that can be parsed or otherwise handled correctly, e.g. good events
- Fatal error, like parsing error, empty or incorrect values, etc., e.g. bad events
- Non-fatal, like database-connection failure, io-failure, out of memory, and others
  that could be retried

Best to avoid doing something blocking while handling the error, so create a separate stream for each. That way 'good' events don't have to wait for the handling of 'bad' events.

Any fatal can events you could store in a separate topic, or send to some monitoring/logging system. As a simple start you could sent the erroneous events to a separate topic named something like 'errorevents'. 
Any non-fatal errors could be retried. Last time I used Akka for that (https://doc.akka.io/docs/alpakka-kafka/current/errorhandling.html) but afaik KStreams has mechanism for that as well.  You could also store records that you want to retry into a separate topic 'retry'.
Do not store records that that you want to retry back into the original topic! If you do that you're have a great risk that overload you're whole kafka-cluster.

Op 18-06-2020 09:55 heeft Pushkar Deole <pd...@gmail.com> geschreven:

    Hi All,

    This is what I am observing: we have a consumer which polls data from
    topic, does the processing, again polls data which keeps happening
    continuously.
    At one time, there was some bad data on the topic which could not be
    consumed by consumer, probably because it couldn't deserialize the event
    due to incompatible avro schema or something similar,
    and consumer got error deserializing event. Since the exception wasn't
    handled, it crashed the consumer thread which then stopped consuming data.

    The question here is how these kind of scenarios can be handled:
    1. Even if I catch the exception and log it, the consumer will i think
    process the next event. So the bad event will be lost
    2. When consumer goes for another poll, it would commit offsets of previous
    poll which includes bad event, So the event will be lost

    How can this scenario be handled in best possible way?