You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Marina Popova <pp...@protonmail.com> on 2017/09/29 17:14:50 UTC

Kafka FileStreamSinkConnector handling of bad messages

Hi,
I have the FileStreamSinkConnector working perfectly fine in a distributed mode when only good messages are being sent to the input event topic.

However, if I send a message that is bad - for example, not in a correct JSON format, and I am using the Json converter for keys/values as following:
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

then the Task that handles this message fails (which is expected) and gets into the FAILED status. (and if I have more than one task, they all eventually get killed since they all are getting this bad message to reprocess once the rebalance happens....)

So, I tried to restart the failed task as following:
 curl -X POST localhost:8083/connectors/local-file-sink/tasks/0/restart
and it does restart , but it immediately gets the same bad message and kills itself again.....

Looks like the offset for the bad message never gets committed and Kafka consumer (the task) keeps reading it and killing itself forever....

Is there a way to start the Task from the LATEST offset, or somehow instruct the FileStreamSinkConnector to skip bad messages?

I found this older post that describes exactly my situation:
https://groups.google.com/forum/#!topic/confluent-platform/hkyDslGym9g
- and the suggestion was to add handling of these errors into the Connector code..... however, I am using the pre-packaged file sink connector and would rather not modify its code....

Thanks!!
Marina

Sent with [ProtonMail](https://protonmail.com) Secure Email.

Re: Kafka FileStreamSinkConnector handling of bad messages

Posted by Ted Yu <yu...@gmail.com>.
Considering Ewen's response, you can open a JIRA for applying the
suggestion toward FileStreamSinkConnector.

Cheers

On Wed, Oct 18, 2017 at 10:39 AM, Marina Popova <pp...@protonmail.com>
wrote:

> Hi,
> I wanted to give this question a second try.... as I feel it is very
> important to understand how to control error cases with Connectors.
> Any advice on how to control handling of "poison" messages in case of
> connectors?
>
> Thanks!
> Marina
>
> > Hi,
> > I have the FileStreamSinkConnector working perfectly fine in a
> distributed mode when only good messages are being sent to the input event
> topic.
> >
> > However, if I send a message that is bad - for example, not in a correct
> JSON format, and I am using the Json converter for keys/values as following:
> > key.converter=org.apache.kafka.connect.json.JsonConverter
> > value.converter=org.apache.kafka.connect.json.JsonConverter
> >
> > then the Task that handles this message fails (which is expected) and
> gets into the FAILED status. (and if I have more than one task, they all
> eventually get killed since they all are getting this bad message to
> reprocess once the rebalance happens....)
> >
> > So, I tried to restart the failed task as following:
> >  curl -X POST localhost:8083/connectors/local-file-sink/tasks/0/restart
> > and it does restart , but it immediately gets the same bad message and
> kills itself again.....
> >
> > Looks like the offset for the bad message never gets committed and Kafka
> consumer (the task) keeps reading it and killing itself forever....
> >
> > Is there a way to start the Task from the LATEST offset, or somehow
> instruct the FileStreamSinkConnector to skip bad messages?
> >
> > I found this older post that describes exactly my situation:
> > https://groups.google.com/forum/#!topic/confluent-platform/hkyDslGym9g
> > - and the suggestion was to add handling of these errors into the
> Connector code..... however, I am using the pre-packaged file sink
> connector and would rather not modify its code....
> >
> > Thanks!!
> > Marina
> >
> > Sent with [ProtonMail](https://protonmail.com) Secure Email.
>

Re: Kafka FileStreamSinkConnector handling of bad messages

Posted by Marina Popova <pp...@protonmail.com>.
Thank you, Dhawan!
You have basically validated our approach as well....
We have been using our own "connector"-like service to batch events from Kafka and index into ElasticSearch (https://github.com/BigDataDevs/kafka-elasticsearch-consumer)
for awhile now. And the main reason we were doing this instead of using connectors or bigger frameworks like Logstash was because we wanted to have a a direct control over which exceptions we treat as "recoverable" and which as non-recoverable.

I was monitoring Connectors space recently and was hoping it might be time to give them a try, but looks like they do not offer the same level of direct control over the exception handling yet (which is normal for more high-level products, of course - the higher level of abstraction you use the less control over details you have ...)

Thank you!
Marina

Sent with [ProtonMail](https://protonmail.com) Secure Email.

> -------- Original Message --------
> Subject: Re: Kafka FileStreamSinkConnector handling of bad messages
> Local Time: October 18, 2017 5:36 PM
> UTC Time: October 18, 2017 9:36 PM
> From: dhawan.gajendran@datavisor.com
> To: users@kafka.apache.org, Marina Popova <pp...@protonmail.com>
>
> Hi Marina,
>
> We hit a similar problem with our S3 connectors. We added a level of indirection, a JSON validating microservice, before publishing to the Kafka topic. The microservice published non-JSON formatted messages to a separate Kafka topic called error-jsons and we flushed those messages using a custom consumer which handles all messages as binaries. Hope that helps.
>
> Thanks,
> Dhawan
>
> On Wed, Oct 18, 2017 at 10:39 AM, Marina Popova <pp...@protonmail.com> wrote:
>
>> Hi,
>> I wanted to give this question a second try.... as I feel it is very important to understand how to control error cases with Connectors.
>> Any advice on how to control handling of "poison" messages in case of connectors?
>>
>> Thanks!
>> Marina
>>
>>> Hi,
>>> I have the FileStreamSinkConnector working perfectly fine in a distributed mode when only good messages are being sent to the input event topic.
>>>
>>> However, if I send a message that is bad - for example, not in a correct JSON format, and I am using the Json converter for keys/values as following:
>>> key.converter=org.apache.kafka.connect.json.JsonConverter
>>> value.converter=org.apache.kafka.connect.json.JsonConverter
>>>
>>> then the Task that handles this message fails (which is expected) and gets into the FAILED status. (and if I have more than one task, they all eventually get killed since they all are getting this bad message to reprocess once the rebalance happens....)
>>>
>>> So, I tried to restart the failed task as following:
>>>  curl -X POST localhost:8083/connectors/local-file-sink/tasks/0/restart
>>> and it does restart , but it immediately gets the same bad message and kills itself again.....
>>>
>>> Looks like the offset for the bad message never gets committed and Kafka consumer (the task) keeps reading it and killing itself forever....
>>>
>>> Is there a way to start the Task from the LATEST offset, or somehow instruct the FileStreamSinkConnector to skip bad messages?
>>>
>>> I found this older post that describes exactly my situation:
>>> https://groups.google.com/forum/#!topic/confluent-platform/hkyDslGym9g
>>> - and the suggestion was to add handling of these errors into the Connector code..... however, I am using the pre-packaged file sink connector and would rather not modify its code....
>>>
>>> Thanks!!
>>> Marina
>>>
>>> Sent with [ProtonMail](https://protonmail.com) Secure Email.

Re: Kafka FileStreamSinkConnector handling of bad messages

Posted by Dhawan Gajendran <dh...@datavisor.com>.
Hi Marina,

We hit a similar problem with our S3 connectors. We added a level of
indirection, a JSON validating microservice, before publishing to the Kafka
topic. The microservice published non-JSON formatted messages to a separate
Kafka topic called error-jsons and we flushed those messages using a custom
consumer which handles all messages as binaries. Hope that helps.

Thanks,
Dhawan

On Wed, Oct 18, 2017 at 10:39 AM, Marina Popova <pp...@protonmail.com>
wrote:

> Hi,
> I wanted to give this question a second try.... as I feel it is very
> important to understand how to control error cases with Connectors.
> Any advice on how to control handling of "poison" messages in case of
> connectors?
>
> Thanks!
> Marina
>
> > Hi,
> > I have the FileStreamSinkConnector working perfectly fine in a
> distributed mode when only good messages are being sent to the input event
> topic.
> >
> > However, if I send a message that is bad - for example, not in a correct
> JSON format, and I am using the Json converter for keys/values as following:
> > key.converter=org.apache.kafka.connect.json.JsonConverter
> > value.converter=org.apache.kafka.connect.json.JsonConverter
> >
> > then the Task that handles this message fails (which is expected) and
> gets into the FAILED status. (and if I have more than one task, they all
> eventually get killed since they all are getting this bad message to
> reprocess once the rebalance happens....)
> >
> > So, I tried to restart the failed task as following:
> >  curl -X POST localhost:8083/connectors/local-file-sink/tasks/0/restart
> > and it does restart , but it immediately gets the same bad message and
> kills itself again.....
> >
> > Looks like the offset for the bad message never gets committed and Kafka
> consumer (the task) keeps reading it and killing itself forever....
> >
> > Is there a way to start the Task from the LATEST offset, or somehow
> instruct the FileStreamSinkConnector to skip bad messages?
> >
> > I found this older post that describes exactly my situation:
> > https://groups.google.com/forum/#!topic/confluent-platform/hkyDslGym9g
> > - and the suggestion was to add handling of these errors into the
> Connector code..... however, I am using the pre-packaged file sink
> connector and would rather not modify its code....
> >
> > Thanks!!
> > Marina
> >
> > Sent with [ProtonMail](https://protonmail.com) Secure Email.
>

Re: Kafka FileStreamSinkConnector handling of bad messages

Posted by Marina Popova <pp...@protonmail.com>.
Hi,
I wanted to give this question a second try.... as I feel it is very important to understand how to control error cases with Connectors.
Any advice on how to control handling of "poison" messages in case of connectors?

Thanks!
Marina

> Hi,
> I have the FileStreamSinkConnector working perfectly fine in a distributed mode when only good messages are being sent to the input event topic.
>
> However, if I send a message that is bad - for example, not in a correct JSON format, and I am using the Json converter for keys/values as following:
> key.converter=org.apache.kafka.connect.json.JsonConverter
> value.converter=org.apache.kafka.connect.json.JsonConverter
>
> then the Task that handles this message fails (which is expected) and gets into the FAILED status. (and if I have more than one task, they all eventually get killed since they all are getting this bad message to reprocess once the rebalance happens....)
>
> So, I tried to restart the failed task as following:
>  curl -X POST localhost:8083/connectors/local-file-sink/tasks/0/restart
> and it does restart , but it immediately gets the same bad message and kills itself again.....
>
> Looks like the offset for the bad message never gets committed and Kafka consumer (the task) keeps reading it and killing itself forever....
>
> Is there a way to start the Task from the LATEST offset, or somehow instruct the FileStreamSinkConnector to skip bad messages?
>
> I found this older post that describes exactly my situation:
> https://groups.google.com/forum/#!topic/confluent-platform/hkyDslGym9g
> - and the suggestion was to add handling of these errors into the Connector code..... however, I am using the pre-packaged file sink connector and would rather not modify its code....
>
> Thanks!!
> Marina
>
> Sent with [ProtonMail](https://protonmail.com) Secure Email.