You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by ashish pok <as...@yahoo.com> on 2018/01/19 19:59:53 UTC

Kafka Producer timeout causing data loss

Team,
One more question to the community regarding hardening Flink Apps.
Let me start off by saying we do have known Kafka bottlenecks which we are in the midst of resolving. So during certain times of day, a lot of our Flink Apps are seeing Kafka Producer timeout issues. Most of the logs are some flavor of this:
java.lang.Exception: Failed to send data to Kafka: Expiring 28 record(s) for dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation plus linger time at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:302) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:421) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:745)Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 28 record(s) for dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation plus linger time
Timeouts are not necessarily good but I am sure we understand this is bound to happen (hopefully lesser). 
The issue for us however is it almost looks like Flink is stopping and restarting all operators (a lot of other operators including Map, Reduce and Process functions if not all) along with Kafka Producers. We are processing pretty substantial load in Flink and dont really intend to enable Rocks/HDFS checkpointing in some of these Apps - we are ok to sustain some data loss when App crashes completely or something along those lines. However, what we are noticing here is all the data that are in memory for sliding window functions are also lost completely because of this. I would have thought because of the retry settings in Kafka Producer, even those 28 events in queue should have been recovered let alone over a million events in Memory State waiting to be Folded/Reduced for the sliding window. This doesnt feel right :) 
Is only way to solve this is by creating Rocks/HDFS checkpoint? Why would almost all Job Graph restart on an operator timeout? Do I need to do something simple like disable Operator chaining? We really really are trying to just use Memory and not any other state for these heavy hitting streams. 
Thanks for your help,
Ashish

Re: Kafka Producer timeout causing data loss

Posted by Vishal Santoshi <vi...@gmail.com>.
The reorder issue can be resolved by setting
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION to 1 if we talking pure kafka
producer configs ( and I believe they port over to flink kafka connecter ).
This does limit the concurrency ( at the TCP level )  when kafka is back
up  an issue which is not very limiting once we have understood  the
batch,size and linger.ms configurations and set them up optimally, of kafka
producer.

On Thu, Jan 25, 2018 at 11:41 AM, Elias Levy <fe...@gmail.com>
wrote:

> Try setting the Kafka producer config option for number of retries
> ("retries") to a large number, to avoid the timeout.  It defaults to zero.
> Do note that retries may result reordered records.
>
> On Wed, Jan 24, 2018 at 7:07 PM, Ashish Pokharel <as...@yahoo.com>
> wrote:
>
>> Fabian,
>>
>> Thanks for your feedback - very helpful as usual !
>>
>> This is sort of becoming a huge problem for us right now because of our
>> Kafka situation. For some reason I missed this detail going through the
>> docs. We are definitely seeing heavy dose of data loss when Kafka timeouts
>> are happening.
>>
>> We actually have 1.4 version - I’d be interested to understand if
>> anything can be done in 1.4 to prevent this scenario.
>>
>> One other thought I had was an ability to invoke “Checkpointing before
>> Restart / Recovery” -> meaning I don’t necessarily need to checkpoint
>> periodically but I do want to make sure on a explicit restart /
>> rescheduling like this, we do have a decent “last known” state. Not sure if
>> this is currently doable.
>>
>> Thanks, Ashish
>>
>> On Jan 23, 2018, at 5:03 AM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>> Hi Ashish,
>>
>> Originally, Flink always performed full recovery in case of a failure,
>> i.e., it restarted the complete application.
>> There is some ongoing work to improve this and make recovery more
>> fine-grained (FLIP-1 [1]).
>> Some parts have been added for 1.3.0.
>>
>> I'm not familiar with the details, but Stefan (in CC) should be able to
>> answer your specific question.
>>
>> Best, Fabian
>>
>> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%
>> 3A+Fine+Grained+Recovery+from+Task+Failures
>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+:+Fine+Grained+Recovery+from+Task+Failures>
>>
>> 2018-01-19 20:59 GMT+01:00 ashish pok <as...@yahoo.com>:
>>
>>> Team,
>>>
>>> One more question to the community regarding hardening Flink Apps.
>>>
>>> Let me start off by saying we do have known Kafka bottlenecks which we
>>> are in the midst of resolving. So during certain times of day, a lot of our
>>> Flink Apps are seeing Kafka Producer timeout issues. Most of the logs are
>>> some flavor of this:
>>>
>>> java.lang.Exception: Failed to send data to Kafka: Expiring 28 record(s)
>>> for dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation plus
>>> linger time
>>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProduc
>>> erBase.checkErroneous(FlinkKafkaProducerBase.java:373)
>>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProduc
>>> er010.invokeInternal(FlinkKafkaProducer010.java:302)
>>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProduc
>>> er010.processElement(FlinkKafkaProducer010.java:421)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>> ngChainingOutput.pushToOperator(OperatorChain.java:549)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>> ngChainingOutput.collect(OperatorChain.java:524)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>> ngChainingOutput.collect(OperatorChain.java:504)
>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>> tor$CountingOutput.collect(AbstractStreamOperator.java:831)
>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>> tor$CountingOutput.collect(AbstractStreamOperator.java:809)
>>> at org.apache.flink.streaming.api.operators.StreamMap.processEl
>>> ement(StreamMap.java:41)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>> ngChainingOutput.pushToOperator(OperatorChain.java:549)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>> ngChainingOutput.collect(OperatorChain.java:524)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>> ngChainingOutput.collect(OperatorChain.java:504)
>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>> tor$CountingOutput.collect(AbstractStreamOperator.java:831)
>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>> tor$CountingOutput.collect(AbstractStreamOperator.java:809)
>>> at org.apache.flink.streaming.api.operators.StreamMap.processEl
>>> ement(StreamMap.java:41)
>>> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.p
>>> rocessInput(StreamInputProcessor.java:207)
>>> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.
>>> run(OneInputStreamTask.java:69)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S
>>> treamTask.java:264)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 28
>>> record(s) for dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation
>>> plus linger time
>>>
>>> Timeouts are not necessarily good but I am sure we understand this is
>>> bound to happen (hopefully lesser).
>>>
>>> The issue for us however is it almost looks like Flink is stopping and
>>> restarting all operators (a lot of other operators including Map, Reduce
>>> and Process functions if not all) along with Kafka Producers. We are
>>> processing pretty substantial load in Flink and dont really intend to
>>> enable Rocks/HDFS checkpointing in some of these Apps - we are ok to
>>> sustain some data loss when App crashes completely or something along those
>>> lines. However, what we are noticing here is all the data that are in
>>> memory for sliding window functions are also lost completely because of
>>> this. I would have thought because of the retry settings in Kafka Producer,
>>> even those 28 events in queue should have been recovered let alone over a
>>> million events in Memory State waiting to be Folded/Reduced for the sliding
>>> window. This doesnt feel right :)
>>>
>>> Is only way to solve this is by creating Rocks/HDFS checkpoint? Why
>>> would almost all Job Graph restart on an operator timeout? Do I need to do
>>> something simple like disable Operator chaining? We really really are
>>> trying to just use Memory and not any other state for these heavy hitting
>>> streams.
>>>
>>> Thanks for your help,
>>>
>>> Ashish
>>>
>>
>>
>>
>

Re: Kafka Producer timeout causing data loss

Posted by Elias Levy <fe...@gmail.com>.
Try setting the Kafka producer config option for number of retries
("retries") to a large number, to avoid the timeout.  It defaults to zero.
Do note that retries may result reordered records.

On Wed, Jan 24, 2018 at 7:07 PM, Ashish Pokharel <as...@yahoo.com>
wrote:

> Fabian,
>
> Thanks for your feedback - very helpful as usual !
>
> This is sort of becoming a huge problem for us right now because of our
> Kafka situation. For some reason I missed this detail going through the
> docs. We are definitely seeing heavy dose of data loss when Kafka timeouts
> are happening.
>
> We actually have 1.4 version - I’d be interested to understand if anything
> can be done in 1.4 to prevent this scenario.
>
> One other thought I had was an ability to invoke “Checkpointing before
> Restart / Recovery” -> meaning I don’t necessarily need to checkpoint
> periodically but I do want to make sure on a explicit restart /
> rescheduling like this, we do have a decent “last known” state. Not sure if
> this is currently doable.
>
> Thanks, Ashish
>
> On Jan 23, 2018, at 5:03 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
> Hi Ashish,
>
> Originally, Flink always performed full recovery in case of a failure,
> i.e., it restarted the complete application.
> There is some ongoing work to improve this and make recovery more
> fine-grained (FLIP-1 [1]).
> Some parts have been added for 1.3.0.
>
> I'm not familiar with the details, but Stefan (in CC) should be able to
> answer your specific question.
>
> Best, Fabian
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 1+%3A+Fine+Grained+Recovery+from+Task+Failures
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+:+Fine+Grained+Recovery+from+Task+Failures>
>
> 2018-01-19 20:59 GMT+01:00 ashish pok <as...@yahoo.com>:
>
>> Team,
>>
>> One more question to the community regarding hardening Flink Apps.
>>
>> Let me start off by saying we do have known Kafka bottlenecks which we
>> are in the midst of resolving. So during certain times of day, a lot of our
>> Flink Apps are seeing Kafka Producer timeout issues. Most of the logs are
>> some flavor of this:
>>
>> java.lang.Exception: Failed to send data to Kafka: Expiring 28 record(s)
>> for dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation plus
>> linger time
>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProduc
>> erBase.checkErroneous(FlinkKafkaProducerBase.java:373)
>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProduc
>> er010.invokeInternal(FlinkKafkaProducer010.java:302)
>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProduc
>> er010.processElement(FlinkKafkaProducer010.java:421)
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>> ngChainingOutput.pushToOperator(OperatorChain.java:549)
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>> ngChainingOutput.collect(OperatorChain.java:524)
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>> ngChainingOutput.collect(OperatorChain.java:504)
>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor$CountingOutput.collect(AbstractStreamOperator.java:831)
>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor$CountingOutput.collect(AbstractStreamOperator.java:809)
>> at org.apache.flink.streaming.api.operators.StreamMap.processEl
>> ement(StreamMap.java:41)
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>> ngChainingOutput.pushToOperator(OperatorChain.java:549)
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>> ngChainingOutput.collect(OperatorChain.java:524)
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>> ngChainingOutput.collect(OperatorChain.java:504)
>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor$CountingOutput.collect(AbstractStreamOperator.java:831)
>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor$CountingOutput.collect(AbstractStreamOperator.java:809)
>> at org.apache.flink.streaming.api.operators.StreamMap.processEl
>> ement(StreamMap.java:41)
>> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.p
>> rocessInput(StreamInputProcessor.java:207)
>> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.
>> run(OneInputStreamTask.java:69)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:264)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 28
>> record(s) for dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation
>> plus linger time
>>
>> Timeouts are not necessarily good but I am sure we understand this is
>> bound to happen (hopefully lesser).
>>
>> The issue for us however is it almost looks like Flink is stopping and
>> restarting all operators (a lot of other operators including Map, Reduce
>> and Process functions if not all) along with Kafka Producers. We are
>> processing pretty substantial load in Flink and dont really intend to
>> enable Rocks/HDFS checkpointing in some of these Apps - we are ok to
>> sustain some data loss when App crashes completely or something along those
>> lines. However, what we are noticing here is all the data that are in
>> memory for sliding window functions are also lost completely because of
>> this. I would have thought because of the retry settings in Kafka Producer,
>> even those 28 events in queue should have been recovered let alone over a
>> million events in Memory State waiting to be Folded/Reduced for the sliding
>> window. This doesnt feel right :)
>>
>> Is only way to solve this is by creating Rocks/HDFS checkpoint? Why would
>> almost all Job Graph restart on an operator timeout? Do I need to do
>> something simple like disable Operator chaining? We really really are
>> trying to just use Memory and not any other state for these heavy hitting
>> streams.
>>
>> Thanks for your help,
>>
>> Ashish
>>
>
>
>

Re: Kafka Producer timeout causing data loss

Posted by Ashish Pokharel <as...@yahoo.com>.
Fabian,

Thanks for your feedback - very helpful as usual !

This is sort of becoming a huge problem for us right now because of our Kafka situation. For some reason I missed this detail going through the docs. We are definitely seeing heavy dose of data loss when Kafka timeouts are happening. 

We actually have 1.4 version - I’d be interested to understand if anything can be done in 1.4 to prevent this scenario.

One other thought I had was an ability to invoke “Checkpointing before Restart / Recovery” -> meaning I don’t necessarily need to checkpoint periodically but I do want to make sure on a explicit restart / rescheduling like this, we do have a decent “last known” state. Not sure if this is currently doable.

Thanks, Ashish

> On Jan 23, 2018, at 5:03 AM, Fabian Hueske <fh...@gmail.com> wrote:
> 
> Hi Ashish,
> 
> Originally, Flink always performed full recovery in case of a failure, i.e., it restarted the complete application.
> There is some ongoing work to improve this and make recovery more fine-grained (FLIP-1 [1]). 
> Some parts have been added for 1.3.0.
> 
> I'm not familiar with the details, but Stefan (in CC) should be able to answer your specific question.
> 
> Best, Fabian
> 
> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures <https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+:+Fine+Grained+Recovery+from+Task+Failures>
> 
> 2018-01-19 20:59 GMT+01:00 ashish pok <ashishpok@yahoo.com <ma...@yahoo.com>>:
> Team,
> 
> One more question to the community regarding hardening Flink Apps.
> 
> Let me start off by saying we do have known Kafka bottlenecks which we are in the midst of resolving. So during certain times of day, a lot of our Flink Apps are seeing Kafka Producer timeout issues. Most of the logs are some flavor of this:
> 
> java.lang.Exception: Failed to send data to Kafka: Expiring 28 record(s) for dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation plus linger time
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:302)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:421)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
> 	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
> 	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> 	at org.apache.flink.streaming.runtime.io <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 28 record(s) for dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation plus linger time
> 
> Timeouts are not necessarily good but I am sure we understand this is bound to happen (hopefully lesser). 
> 
> The issue for us however is it almost looks like Flink is stopping and restarting all operators (a lot of other operators including Map, Reduce and Process functions if not all) along with Kafka Producers. We are processing pretty substantial load in Flink and dont really intend to enable Rocks/HDFS checkpointing in some of these Apps - we are ok to sustain some data loss when App crashes completely or something along those lines. However, what we are noticing here is all the data that are in memory for sliding window functions are also lost completely because of this. I would have thought because of the retry settings in Kafka Producer, even those 28 events in queue should have been recovered let alone over a million events in Memory State waiting to be Folded/Reduced for the sliding window. This doesnt feel right :) 
> 
> Is only way to solve this is by creating Rocks/HDFS checkpoint? Why would almost all Job Graph restart on an operator timeout? Do I need to do something simple like disable Operator chaining? We really really are trying to just use Memory and not any other state for these heavy hitting streams. 
> 
> Thanks for your help,
> 
> Ashish
> 


Re: Kafka Producer timeout causing data loss

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Ashish,

Originally, Flink always performed full recovery in case of a failure,
i.e., it restarted the complete application.
There is some ongoing work to improve this and make recovery more
fine-grained (FLIP-1 [1]).
Some parts have been added for 1.3.0.

I'm not familiar with the details, but Stefan (in CC) should be able to
answer your specific question.

Best, Fabian

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures

2018-01-19 20:59 GMT+01:00 ashish pok <as...@yahoo.com>:

> Team,
>
> One more question to the community regarding hardening Flink Apps.
>
> Let me start off by saying we do have known Kafka bottlenecks which we are
> in the midst of resolving. So during certain times of day, a lot of our
> Flink Apps are seeing Kafka Producer timeout issues. Most of the logs are
> some flavor of this:
>
> java.lang.Exception: Failed to send data to Kafka: Expiring 28 record(s)
> for dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation plus
> linger time
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.
> checkErroneous(FlinkKafkaProducerBase.java:373)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.
> invokeInternal(FlinkKafkaProducer010.java:302)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.
> processElement(FlinkKafkaProducer010.java:421)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:524)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:504)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:831)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:809)
> at org.apache.flink.streaming.api.operators.StreamMap.
> processElement(StreamMap.java:41)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:524)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:504)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:831)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:809)
> at org.apache.flink.streaming.api.operators.StreamMap.
> processElement(StreamMap.java:41)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:207)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:69)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:264)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 28
> record(s) for dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation
> plus linger time
>
> Timeouts are not necessarily good but I am sure we understand this is
> bound to happen (hopefully lesser).
>
> The issue for us however is it almost looks like Flink is stopping and
> restarting all operators (a lot of other operators including Map, Reduce
> and Process functions if not all) along with Kafka Producers. We are
> processing pretty substantial load in Flink and dont really intend to
> enable Rocks/HDFS checkpointing in some of these Apps - we are ok to
> sustain some data loss when App crashes completely or something along those
> lines. However, what we are noticing here is all the data that are in
> memory for sliding window functions are also lost completely because of
> this. I would have thought because of the retry settings in Kafka Producer,
> even those 28 events in queue should have been recovered let alone over a
> million events in Memory State waiting to be Folded/Reduced for the sliding
> window. This doesnt feel right :)
>
> Is only way to solve this is by creating Rocks/HDFS checkpoint? Why would
> almost all Job Graph restart on an operator timeout? Do I need to do
> something simple like disable Operator chaining? We really really are
> trying to just use Memory and not any other state for these heavy hitting
> streams.
>
> Thanks for your help,
>
> Ashish
>