You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by John Smith <ja...@gmail.com> on 2020/10/21 16:04:54 UTC

What does Kafka Error sending fetch request mean for the Kafka source?

Hi, running Flink 1.10.0 we see these logs once in a while... 2020-10-21 15:
48:57,625 INFO org.apache.kafka.clients.FetchSessionHandler - [Consumer
clientId=consumer-2, groupId=xxxxxx-import] Error sending fetch request
(sessionId=806089934, epoch=INITIAL) to node 0:
org.apache.kafka.common.errors.DisconnectException.

Obviously it looks like the consumer is getting disconnected and from what
it seems it's either a Kafka bug on the way it handles the EPOCH or
possibly version mismatch between client and brokers. That's fine I can
look at upgrading the client and/or Kafka. But I'm trying to understand
what happens in terms of the source and the sink. It looks let we get
duplicates on the sink and I'm guessing it's because the consumer is
failing and at that point Flink stays on that checkpoint until it can
reconnect and process that offset and hence the duplicates downstream?

Re: What does Kafka Error sending fetch request mean for the Kafka source?

Posted by Robert Metzger <rm...@apache.org>.
Thanks a lot.
Just a clarification, it's not the Kafka source that is configured
AT_LEAST_ONCE, it is the Flink checkpointing mode as a whole, for all
operations.
This has no effect on regular operations, only on recovery records may be
send multiple times... but it leads to lower latency. I guess this makes
sense in your case, since you are deduping based on a unique key.

For the longer checkpoints, adjusting timeouts makes sense.

On Tue, Nov 3, 2020 at 6:04 PM John Smith <ja...@gmail.com> wrote:

> Kafka source is configured as AT_LEAST_ONCE and the JDBC sink handles
> duplicates with unique key/constraint and logs duplicates in a separate SQL
> table. And essentially it gives us EXACTLY_ONCE semantics.
>
> That's not a problem, it works great!
>
> 1- I was curious if that specific Kafka message was the cause of the
> duplicates, but if I understand correctly Becket it's not the source of the
> duplicates and I wanted to confirm that.
> 2- I started monitoring checkpoints on average they are 100ms, during peak
> we started seeing checkpoints takie 20s-40s+... My checkpoint is configed
> as follows:
>      - env.enableCheckpointing(60000);
>      -
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
>      -
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>      - env.getCheckpointConfig().setCheckpointTimeout(60000);
>      - env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
> 3- Based on above it's possible that the sink takes longer than 60seconds
> sometimes...
>     - Looking at adjusting timeouts.
>     - Looking at reducing the load of the sink and reduce how long it
> takes in general.
>
> On Tue, 3 Nov 2020 at 10:49, Robert Metzger <rm...@apache.org> wrote:
>
>> How did you configure the Kafka source as at least once? Afaik the source
>> is always exactly-once (as long as there aren't any restarts).
>>
>> Are you seeing the duplicates in the context of restarts of the Flink job?
>>
>> On Tue, Nov 3, 2020 at 1:54 AM John Smith <ja...@gmail.com> wrote:
>>
>>> Sorry, got confused with your reply... Does the message "Error sending
>>> fetch request" cause retries/duplicates down stream or it doesn't?
>>>
>>> I'm guessing it's even before the source can even send anything
>>> downstream...
>>>
>>>
>>> On Sat, 31 Oct 2020 at 09:10, John Smith <ja...@gmail.com> wrote:
>>>
>>>> Hi my flow is Kafka Source -> Transform -> JDBC Sink
>>>>
>>>> Kafka Source is configured as at least once and JDBC prevents
>>>> duplicates with unique key constraint and duplicate is logged in separate
>>>> table. So the destination data is exactly once.
>>>>
>>>> The duplicates happen every so often, looking at check point history
>>>> there was some checkpoints that failed, but the history isn't long enough
>>>> to go back and look. I'm guessing I will have to adjust the checkpointing
>>>> times a bit...
>>>>
>>>> On Thu., Oct. 29, 2020, 10:26 a.m. Becket Qin, <be...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi John,
>>>>>
>>>>> The log message you saw from Kafka consumer simply means the consumer
>>>>> was disconnected from the broker that FetchRequest was supposed to be sent
>>>>> to. The disconnection can happen in many cases, such as broker down,
>>>>> network glitches, etc. The KafkaConsumer will just reconnect and retry
>>>>> sending that FetchRequest again. This won't cause duplicate messages in
>>>>> KafkaConsumer or Flink. Have you enabled exactly-once semantic for your
>>>>> Kafka sink? If not, the downstream might see duplicates in case of Flink
>>>>> failover or occasional retry in the KafkaProducer of the Kafka sink.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Jiangjie (Becket) Qin
>>>>>
>>>>> On Thu, Oct 22, 2020 at 11:38 PM John Smith <ja...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Any thoughts this doesn't seem to create duplicates all the time or
>>>>>> maybe it's unrelated as we are still seeing the message and there is no
>>>>>> duplicates...
>>>>>>
>>>>>> On Wed., Oct. 21, 2020, 12:09 p.m. John Smith, <
>>>>>> java.dev.mtl@gmail.com> wrote:
>>>>>>
>>>>>>> And yes my downstream is handling the duplicates in an idempotent
>>>>>>> way so we are good on that point. But just curious what the behaviour is on
>>>>>>> the source consumer when that error happens.
>>>>>>>
>>>>>>> On Wed, 21 Oct 2020 at 12:04, John Smith <ja...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi, running Flink 1.10.0 we see these logs once in a while... 2020-
>>>>>>>> 10-21 15:48:57,625 INFO org.apache.kafka.clients.
>>>>>>>> FetchSessionHandler - [Consumer clientId=consumer-2,
>>>>>>>> groupId=xxxxxx-import] Error sending fetch request (sessionId=
>>>>>>>> 806089934, epoch=INITIAL) to node 0:
>>>>>>>> org.apache.kafka.common.errors.DisconnectException.
>>>>>>>>
>>>>>>>> Obviously it looks like the consumer is getting disconnected and
>>>>>>>> from what it seems it's either a Kafka bug on the way it handles the EPOCH
>>>>>>>> or possibly version mismatch between client and brokers. That's fine I can
>>>>>>>> look at upgrading the client and/or Kafka. But I'm trying to understand
>>>>>>>> what happens in terms of the source and the sink. It looks let we get
>>>>>>>> duplicates on the sink and I'm guessing it's because the consumer is
>>>>>>>> failing and at that point Flink stays on that checkpoint until it can
>>>>>>>> reconnect and process that offset and hence the duplicates downstream?
>>>>>>>>
>>>>>>>

Re: What does Kafka Error sending fetch request mean for the Kafka source?

Posted by John Smith <ja...@gmail.com>.
Kafka source is configured as AT_LEAST_ONCE and the JDBC sink handles
duplicates with unique key/constraint and logs duplicates in a separate SQL
table. And essentially it gives us EXACTLY_ONCE semantics.

That's not a problem, it works great!

1- I was curious if that specific Kafka message was the cause of the
duplicates, but if I understand correctly Becket it's not the source of the
duplicates and I wanted to confirm that.
2- I started monitoring checkpoints on average they are 100ms, during peak
we started seeing checkpoints takie 20s-40s+... My checkpoint is configed
as follows:
     - env.enableCheckpointing(60000);
     -
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
     -
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
     - env.getCheckpointConfig().setCheckpointTimeout(60000);
     - env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
3- Based on above it's possible that the sink takes longer than 60seconds
sometimes...
    - Looking at adjusting timeouts.
    - Looking at reducing the load of the sink and reduce how long it takes
in general.

On Tue, 3 Nov 2020 at 10:49, Robert Metzger <rm...@apache.org> wrote:

> How did you configure the Kafka source as at least once? Afaik the source
> is always exactly-once (as long as there aren't any restarts).
>
> Are you seeing the duplicates in the context of restarts of the Flink job?
>
> On Tue, Nov 3, 2020 at 1:54 AM John Smith <ja...@gmail.com> wrote:
>
>> Sorry, got confused with your reply... Does the message "Error sending
>> fetch request" cause retries/duplicates down stream or it doesn't?
>>
>> I'm guessing it's even before the source can even send anything
>> downstream...
>>
>>
>> On Sat, 31 Oct 2020 at 09:10, John Smith <ja...@gmail.com> wrote:
>>
>>> Hi my flow is Kafka Source -> Transform -> JDBC Sink
>>>
>>> Kafka Source is configured as at least once and JDBC prevents duplicates
>>> with unique key constraint and duplicate is logged in separate table. So
>>> the destination data is exactly once.
>>>
>>> The duplicates happen every so often, looking at check point history
>>> there was some checkpoints that failed, but the history isn't long enough
>>> to go back and look. I'm guessing I will have to adjust the checkpointing
>>> times a bit...
>>>
>>> On Thu., Oct. 29, 2020, 10:26 a.m. Becket Qin, <be...@gmail.com>
>>> wrote:
>>>
>>>> Hi John,
>>>>
>>>> The log message you saw from Kafka consumer simply means the consumer
>>>> was disconnected from the broker that FetchRequest was supposed to be sent
>>>> to. The disconnection can happen in many cases, such as broker down,
>>>> network glitches, etc. The KafkaConsumer will just reconnect and retry
>>>> sending that FetchRequest again. This won't cause duplicate messages in
>>>> KafkaConsumer or Flink. Have you enabled exactly-once semantic for your
>>>> Kafka sink? If not, the downstream might see duplicates in case of Flink
>>>> failover or occasional retry in the KafkaProducer of the Kafka sink.
>>>>
>>>> Thanks,
>>>>
>>>> Jiangjie (Becket) Qin
>>>>
>>>> On Thu, Oct 22, 2020 at 11:38 PM John Smith <ja...@gmail.com>
>>>> wrote:
>>>>
>>>>> Any thoughts this doesn't seem to create duplicates all the time or
>>>>> maybe it's unrelated as we are still seeing the message and there is no
>>>>> duplicates...
>>>>>
>>>>> On Wed., Oct. 21, 2020, 12:09 p.m. John Smith, <ja...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> And yes my downstream is handling the duplicates in an idempotent way
>>>>>> so we are good on that point. But just curious what the behaviour is on the
>>>>>> source consumer when that error happens.
>>>>>>
>>>>>> On Wed, 21 Oct 2020 at 12:04, John Smith <ja...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi, running Flink 1.10.0 we see these logs once in a while... 2020-
>>>>>>> 10-21 15:48:57,625 INFO org.apache.kafka.clients.FetchSessionHandler
>>>>>>> - [Consumer clientId=consumer-2, groupId=xxxxxx-import] Error
>>>>>>> sending fetch request (sessionId=806089934, epoch=INITIAL) to node 0:
>>>>>>> org.apache.kafka.common.errors.DisconnectException.
>>>>>>>
>>>>>>> Obviously it looks like the consumer is getting disconnected and
>>>>>>> from what it seems it's either a Kafka bug on the way it handles the EPOCH
>>>>>>> or possibly version mismatch between client and brokers. That's fine I can
>>>>>>> look at upgrading the client and/or Kafka. But I'm trying to understand
>>>>>>> what happens in terms of the source and the sink. It looks let we get
>>>>>>> duplicates on the sink and I'm guessing it's because the consumer is
>>>>>>> failing and at that point Flink stays on that checkpoint until it can
>>>>>>> reconnect and process that offset and hence the duplicates downstream?
>>>>>>>
>>>>>>

Re: What does Kafka Error sending fetch request mean for the Kafka source?

Posted by Robert Metzger <rm...@apache.org>.
How did you configure the Kafka source as at least once? Afaik the source
is always exactly-once (as long as there aren't any restarts).

Are you seeing the duplicates in the context of restarts of the Flink job?

On Tue, Nov 3, 2020 at 1:54 AM John Smith <ja...@gmail.com> wrote:

> Sorry, got confused with your reply... Does the message "Error sending
> fetch request" cause retries/duplicates down stream or it doesn't?
>
> I'm guessing it's even before the source can even send anything
> downstream...
>
>
> On Sat, 31 Oct 2020 at 09:10, John Smith <ja...@gmail.com> wrote:
>
>> Hi my flow is Kafka Source -> Transform -> JDBC Sink
>>
>> Kafka Source is configured as at least once and JDBC prevents duplicates
>> with unique key constraint and duplicate is logged in separate table. So
>> the destination data is exactly once.
>>
>> The duplicates happen every so often, looking at check point history
>> there was some checkpoints that failed, but the history isn't long enough
>> to go back and look. I'm guessing I will have to adjust the checkpointing
>> times a bit...
>>
>> On Thu., Oct. 29, 2020, 10:26 a.m. Becket Qin, <be...@gmail.com>
>> wrote:
>>
>>> Hi John,
>>>
>>> The log message you saw from Kafka consumer simply means the consumer
>>> was disconnected from the broker that FetchRequest was supposed to be sent
>>> to. The disconnection can happen in many cases, such as broker down,
>>> network glitches, etc. The KafkaConsumer will just reconnect and retry
>>> sending that FetchRequest again. This won't cause duplicate messages in
>>> KafkaConsumer or Flink. Have you enabled exactly-once semantic for your
>>> Kafka sink? If not, the downstream might see duplicates in case of Flink
>>> failover or occasional retry in the KafkaProducer of the Kafka sink.
>>>
>>> Thanks,
>>>
>>> Jiangjie (Becket) Qin
>>>
>>> On Thu, Oct 22, 2020 at 11:38 PM John Smith <ja...@gmail.com>
>>> wrote:
>>>
>>>> Any thoughts this doesn't seem to create duplicates all the time or
>>>> maybe it's unrelated as we are still seeing the message and there is no
>>>> duplicates...
>>>>
>>>> On Wed., Oct. 21, 2020, 12:09 p.m. John Smith, <ja...@gmail.com>
>>>> wrote:
>>>>
>>>>> And yes my downstream is handling the duplicates in an idempotent way
>>>>> so we are good on that point. But just curious what the behaviour is on the
>>>>> source consumer when that error happens.
>>>>>
>>>>> On Wed, 21 Oct 2020 at 12:04, John Smith <ja...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi, running Flink 1.10.0 we see these logs once in a while... 2020-10
>>>>>> -21 15:48:57,625 INFO org.apache.kafka.clients.FetchSessionHandler -
>>>>>> [Consumer clientId=consumer-2, groupId=xxxxxx-import] Error sending
>>>>>> fetch request (sessionId=806089934, epoch=INITIAL) to node 0:
>>>>>> org.apache.kafka.common.errors.DisconnectException.
>>>>>>
>>>>>> Obviously it looks like the consumer is getting disconnected and from
>>>>>> what it seems it's either a Kafka bug on the way it handles the EPOCH or
>>>>>> possibly version mismatch between client and brokers. That's fine I can
>>>>>> look at upgrading the client and/or Kafka. But I'm trying to understand
>>>>>> what happens in terms of the source and the sink. It looks let we get
>>>>>> duplicates on the sink and I'm guessing it's because the consumer is
>>>>>> failing and at that point Flink stays on that checkpoint until it can
>>>>>> reconnect and process that offset and hence the duplicates downstream?
>>>>>>
>>>>>

Re: What does Kafka Error sending fetch request mean for the Kafka source?

Posted by John Smith <ja...@gmail.com>.
Sorry, got confused with your reply... Does the message "Error sending
fetch request" cause retries/duplicates down stream or it doesn't?

I'm guessing it's even before the source can even send anything
downstream...


On Sat, 31 Oct 2020 at 09:10, John Smith <ja...@gmail.com> wrote:

> Hi my flow is Kafka Source -> Transform -> JDBC Sink
>
> Kafka Source is configured as at least once and JDBC prevents duplicates
> with unique key constraint and duplicate is logged in separate table. So
> the destination data is exactly once.
>
> The duplicates happen every so often, looking at check point history there
> was some checkpoints that failed, but the history isn't long enough to go
> back and look. I'm guessing I will have to adjust the checkpointing times a
> bit...
>
> On Thu., Oct. 29, 2020, 10:26 a.m. Becket Qin, <be...@gmail.com>
> wrote:
>
>> Hi John,
>>
>> The log message you saw from Kafka consumer simply means the consumer was
>> disconnected from the broker that FetchRequest was supposed to be sent to.
>> The disconnection can happen in many cases, such as broker down, network
>> glitches, etc. The KafkaConsumer will just reconnect and retry sending that
>> FetchRequest again. This won't cause duplicate messages in KafkaConsumer or
>> Flink. Have you enabled exactly-once semantic for your Kafka sink? If not,
>> the downstream might see duplicates in case of Flink failover or occasional
>> retry in the KafkaProducer of the Kafka sink.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Thu, Oct 22, 2020 at 11:38 PM John Smith <ja...@gmail.com>
>> wrote:
>>
>>> Any thoughts this doesn't seem to create duplicates all the time or
>>> maybe it's unrelated as we are still seeing the message and there is no
>>> duplicates...
>>>
>>> On Wed., Oct. 21, 2020, 12:09 p.m. John Smith, <ja...@gmail.com>
>>> wrote:
>>>
>>>> And yes my downstream is handling the duplicates in an idempotent way
>>>> so we are good on that point. But just curious what the behaviour is on the
>>>> source consumer when that error happens.
>>>>
>>>> On Wed, 21 Oct 2020 at 12:04, John Smith <ja...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi, running Flink 1.10.0 we see these logs once in a while... 2020-10-
>>>>> 21 15:48:57,625 INFO org.apache.kafka.clients.FetchSessionHandler - [
>>>>> Consumer clientId=consumer-2, groupId=xxxxxx-import] Error sending
>>>>> fetch request (sessionId=806089934, epoch=INITIAL) to node 0:
>>>>> org.apache.kafka.common.errors.DisconnectException.
>>>>>
>>>>> Obviously it looks like the consumer is getting disconnected and from
>>>>> what it seems it's either a Kafka bug on the way it handles the EPOCH or
>>>>> possibly version mismatch between client and brokers. That's fine I can
>>>>> look at upgrading the client and/or Kafka. But I'm trying to understand
>>>>> what happens in terms of the source and the sink. It looks let we get
>>>>> duplicates on the sink and I'm guessing it's because the consumer is
>>>>> failing and at that point Flink stays on that checkpoint until it can
>>>>> reconnect and process that offset and hence the duplicates downstream?
>>>>>
>>>>

Re: What does Kafka Error sending fetch request mean for the Kafka source?

Posted by John Smith <ja...@gmail.com>.
Hi my flow is Kafka Source -> Transform -> JDBC Sink

Kafka Source is configured as at least once and JDBC prevents duplicates
with unique key constraint and duplicate is logged in separate table. So
the destination data is exactly once.

The duplicates happen every so often, looking at check point history there
was some checkpoints that failed, but the history isn't long enough to go
back and look. I'm guessing I will have to adjust the checkpointing times a
bit...

On Thu., Oct. 29, 2020, 10:26 a.m. Becket Qin, <be...@gmail.com> wrote:

> Hi John,
>
> The log message you saw from Kafka consumer simply means the consumer was
> disconnected from the broker that FetchRequest was supposed to be sent to.
> The disconnection can happen in many cases, such as broker down, network
> glitches, etc. The KafkaConsumer will just reconnect and retry sending that
> FetchRequest again. This won't cause duplicate messages in KafkaConsumer or
> Flink. Have you enabled exactly-once semantic for your Kafka sink? If not,
> the downstream might see duplicates in case of Flink failover or occasional
> retry in the KafkaProducer of the Kafka sink.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Thu, Oct 22, 2020 at 11:38 PM John Smith <ja...@gmail.com>
> wrote:
>
>> Any thoughts this doesn't seem to create duplicates all the time or maybe
>> it's unrelated as we are still seeing the message and there is no
>> duplicates...
>>
>> On Wed., Oct. 21, 2020, 12:09 p.m. John Smith, <ja...@gmail.com>
>> wrote:
>>
>>> And yes my downstream is handling the duplicates in an idempotent way so
>>> we are good on that point. But just curious what the behaviour is on the
>>> source consumer when that error happens.
>>>
>>> On Wed, 21 Oct 2020 at 12:04, John Smith <ja...@gmail.com> wrote:
>>>
>>>> Hi, running Flink 1.10.0 we see these logs once in a while... 2020-10-
>>>> 21 15:48:57,625 INFO org.apache.kafka.clients.FetchSessionHandler - [
>>>> Consumer clientId=consumer-2, groupId=xxxxxx-import] Error sending
>>>> fetch request (sessionId=806089934, epoch=INITIAL) to node 0:
>>>> org.apache.kafka.common.errors.DisconnectException.
>>>>
>>>> Obviously it looks like the consumer is getting disconnected and from
>>>> what it seems it's either a Kafka bug on the way it handles the EPOCH or
>>>> possibly version mismatch between client and brokers. That's fine I can
>>>> look at upgrading the client and/or Kafka. But I'm trying to understand
>>>> what happens in terms of the source and the sink. It looks let we get
>>>> duplicates on the sink and I'm guessing it's because the consumer is
>>>> failing and at that point Flink stays on that checkpoint until it can
>>>> reconnect and process that offset and hence the duplicates downstream?
>>>>
>>>

Re: What does Kafka Error sending fetch request mean for the Kafka source?

Posted by Becket Qin <be...@gmail.com>.
Hi John,

The log message you saw from Kafka consumer simply means the consumer was
disconnected from the broker that FetchRequest was supposed to be sent to.
The disconnection can happen in many cases, such as broker down, network
glitches, etc. The KafkaConsumer will just reconnect and retry sending that
FetchRequest again. This won't cause duplicate messages in KafkaConsumer or
Flink. Have you enabled exactly-once semantic for your Kafka sink? If not,
the downstream might see duplicates in case of Flink failover or occasional
retry in the KafkaProducer of the Kafka sink.

Thanks,

Jiangjie (Becket) Qin

On Thu, Oct 22, 2020 at 11:38 PM John Smith <ja...@gmail.com> wrote:

> Any thoughts this doesn't seem to create duplicates all the time or maybe
> it's unrelated as we are still seeing the message and there is no
> duplicates...
>
> On Wed., Oct. 21, 2020, 12:09 p.m. John Smith, <ja...@gmail.com>
> wrote:
>
>> And yes my downstream is handling the duplicates in an idempotent way so
>> we are good on that point. But just curious what the behaviour is on the
>> source consumer when that error happens.
>>
>> On Wed, 21 Oct 2020 at 12:04, John Smith <ja...@gmail.com> wrote:
>>
>>> Hi, running Flink 1.10.0 we see these logs once in a while... 2020-10-21
>>> 15:48:57,625 INFO org.apache.kafka.clients.FetchSessionHandler - [
>>> Consumer clientId=consumer-2, groupId=xxxxxx-import] Error sending
>>> fetch request (sessionId=806089934, epoch=INITIAL) to node 0:
>>> org.apache.kafka.common.errors.DisconnectException.
>>>
>>> Obviously it looks like the consumer is getting disconnected and from
>>> what it seems it's either a Kafka bug on the way it handles the EPOCH or
>>> possibly version mismatch between client and brokers. That's fine I can
>>> look at upgrading the client and/or Kafka. But I'm trying to understand
>>> what happens in terms of the source and the sink. It looks let we get
>>> duplicates on the sink and I'm guessing it's because the consumer is
>>> failing and at that point Flink stays on that checkpoint until it can
>>> reconnect and process that offset and hence the duplicates downstream?
>>>
>>

Re: What does Kafka Error sending fetch request mean for the Kafka source?

Posted by John Smith <ja...@gmail.com>.
Any thoughts this doesn't seem to create duplicates all the time or maybe
it's unrelated as we are still seeing the message and there is no
duplicates...

On Wed., Oct. 21, 2020, 12:09 p.m. John Smith, <ja...@gmail.com>
wrote:

> And yes my downstream is handling the duplicates in an idempotent way so
> we are good on that point. But just curious what the behaviour is on the
> source consumer when that error happens.
>
> On Wed, 21 Oct 2020 at 12:04, John Smith <ja...@gmail.com> wrote:
>
>> Hi, running Flink 1.10.0 we see these logs once in a while... 2020-10-21
>> 15:48:57,625 INFO org.apache.kafka.clients.FetchSessionHandler - [
>> Consumer clientId=consumer-2, groupId=xxxxxx-import] Error sending fetch
>> request (sessionId=806089934, epoch=INITIAL) to node 0:
>> org.apache.kafka.common.errors.DisconnectException.
>>
>> Obviously it looks like the consumer is getting disconnected and from
>> what it seems it's either a Kafka bug on the way it handles the EPOCH or
>> possibly version mismatch between client and brokers. That's fine I can
>> look at upgrading the client and/or Kafka. But I'm trying to understand
>> what happens in terms of the source and the sink. It looks let we get
>> duplicates on the sink and I'm guessing it's because the consumer is
>> failing and at that point Flink stays on that checkpoint until it can
>> reconnect and process that offset and hence the duplicates downstream?
>>
>

Re: What does Kafka Error sending fetch request mean for the Kafka source?

Posted by John Smith <ja...@gmail.com>.
And yes my downstream is handling the duplicates in an idempotent way so we
are good on that point. But just curious what the behaviour is on the
source consumer when that error happens.

On Wed, 21 Oct 2020 at 12:04, John Smith <ja...@gmail.com> wrote:

> Hi, running Flink 1.10.0 we see these logs once in a while... 2020-10-21
> 15:48:57,625 INFO org.apache.kafka.clients.FetchSessionHandler - [Consumer
> clientId=consumer-2, groupId=xxxxxx-import] Error sending fetch request
> (sessionId=806089934, epoch=INITIAL) to node 0:
> org.apache.kafka.common.errors.DisconnectException.
>
> Obviously it looks like the consumer is getting disconnected and from what
> it seems it's either a Kafka bug on the way it handles the EPOCH or
> possibly version mismatch between client and brokers. That's fine I can
> look at upgrading the client and/or Kafka. But I'm trying to understand
> what happens in terms of the source and the sink. It looks let we get
> duplicates on the sink and I'm guessing it's because the consumer is
> failing and at that point Flink stays on that checkpoint until it can
> reconnect and process that offset and hence the duplicates downstream?
>