You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Alexander Smirnov <al...@gmail.com> on 2018/05/04 13:45:44 UTC

This server is not the leader for that topic-partition

Hi,

what could cause the following exception?

org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed
to send data to Kafka: This server is not the leader for that
topic-partition.
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:999)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:614)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:93)
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:219)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at com.alexander.smirnov.FilterBMFunction.flatMap(FilterBMFunction.java:162)


Thank you,
Alex

Re: This server is not the leader for that topic-partition

Posted by vino yang <ya...@gmail.com>.
Hi Jayant,

Can you try to connect to kafka server 0.10.x via
flink-connector-kafka-0.10? See if it still throws this exception.

Thanks, vino.

Jayant Ameta <wi...@gmail.com> 于2018年9月4日周二 下午1:20写道:

> Flink: 1.4.2
> flink-connector-kafka-0.11_2.11 (1.4.2)
> Kafka: 0.10.1.0
>
> Jayant Ameta
>
>
> On Tue, Sep 4, 2018 at 10:16 AM vino yang <ya...@gmail.com> wrote:
>
>> Hi Jayant,
>>
>> Can you provide more specific information? For example, the version of
>> your Flink, the version of kafka on which Flink-Kafka-Connector depends,
>> and the version of kafka server.
>>
>> Thanks, vino.
>>
>> Jayant Ameta <wi...@gmail.com> 于2018年9月4日周二 下午12:32写道:
>>
>>> I am getting the same error. Is there a way to retry/ignore instead of
>>> killing the job?
>>>
>>> Jayant Ameta
>>>
>>>
>>> On Tue, May 22, 2018 at 7:58 PM gerardg <ge...@talaia.io> wrote:
>>>
>>>> I've seen the same error while upgrading Kafka. We are using
>>>> FlinkKafkaProducer011 and Kafka version 1.0.0. While upgrading to Kafka
>>>> 1.1.0, each time a server was restarted, an already running Flink job
>>>> failed
>>>> with the same message.
>>>>
>>>> Gerard
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>
>>>

Re: This server is not the leader for that topic-partition

Posted by Jayant Ameta <wi...@gmail.com>.
Flink: 1.4.2
flink-connector-kafka-0.11_2.11 (1.4.2)
Kafka: 0.10.1.0

Jayant Ameta


On Tue, Sep 4, 2018 at 10:16 AM vino yang <ya...@gmail.com> wrote:

> Hi Jayant,
>
> Can you provide more specific information? For example, the version of
> your Flink, the version of kafka on which Flink-Kafka-Connector depends,
> and the version of kafka server.
>
> Thanks, vino.
>
> Jayant Ameta <wi...@gmail.com> 于2018年9月4日周二 下午12:32写道:
>
>> I am getting the same error. Is there a way to retry/ignore instead of
>> killing the job?
>>
>> Jayant Ameta
>>
>>
>> On Tue, May 22, 2018 at 7:58 PM gerardg <ge...@talaia.io> wrote:
>>
>>> I've seen the same error while upgrading Kafka. We are using
>>> FlinkKafkaProducer011 and Kafka version 1.0.0. While upgrading to Kafka
>>> 1.1.0, each time a server was restarted, an already running Flink job
>>> failed
>>> with the same message.
>>>
>>> Gerard
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>

Re: This server is not the leader for that topic-partition

Posted by vino yang <ya...@gmail.com>.
Hi Jayant,

Can you provide more specific information? For example, the version of your
Flink, the version of kafka on which Flink-Kafka-Connector depends, and the
version of kafka server.

Thanks, vino.

Jayant Ameta <wi...@gmail.com> 于2018年9月4日周二 下午12:32写道:

> I am getting the same error. Is there a way to retry/ignore instead of
> killing the job?
>
> Jayant Ameta
>
>
> On Tue, May 22, 2018 at 7:58 PM gerardg <ge...@talaia.io> wrote:
>
>> I've seen the same error while upgrading Kafka. We are using
>> FlinkKafkaProducer011 and Kafka version 1.0.0. While upgrading to Kafka
>> 1.1.0, each time a server was restarted, an already running Flink job
>> failed
>> with the same message.
>>
>> Gerard
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>

Re: This server is not the leader for that topic-partition

Posted by Jayant Ameta <wi...@gmail.com>.
I am getting the same error. Is there a way to retry/ignore instead of
killing the job?

Jayant Ameta


On Tue, May 22, 2018 at 7:58 PM gerardg <ge...@talaia.io> wrote:

> I've seen the same error while upgrading Kafka. We are using
> FlinkKafkaProducer011 and Kafka version 1.0.0. While upgrading to Kafka
> 1.1.0, each time a server was restarted, an already running Flink job
> failed
> with the same message.
>
> Gerard
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: This server is not the leader for that topic-partition

Posted by gerardg <ge...@talaia.io>.
I've seen the same error while upgrading Kafka. We are using
FlinkKafkaProducer011 and Kafka version 1.0.0. While upgrading to Kafka
1.1.0, each time a server was restarted, an already running Flink job failed
with the same message.

Gerard



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: This server is not the leader for that topic-partition

Posted by Alexander Smirnov <al...@gmail.com>.
thank you Piotr

On Mon, May 7, 2018 at 2:59 PM Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> Regardless if that will fix the problem or not, please consider upgrading
> to Kafka 0.11.0.2 or 1.0.1. Kafka 0.11.0 release was quite messy and it
> might be that the bug you have hit was fixed in 0.11.0.2.
>
> As a side note, as far as we know our FlinkKafkaProducer011 works fine
> with Kafka 1.0.x.
>
> Piotrek
>
> On 7 May 2018, at 12:12, Alexander Smirnov <al...@gmail.com>
> wrote:
>
> Hi Piotr, using 0.11.0 Kafka version
>
> On Sat, May 5, 2018 at 10:19 AM Piotr Nowojski <pi...@data-artisans.com>
> wrote:
>
>> FlinkKafka011Producer uses Kafka 0.11.0.2.
>>
>> However I’m not sure if bumping KafkaProducer version solves this issue
>> or upgrading Kafka. What Kafka version are you using?
>>
>> Piotrek
>>
>>
>> On 4 May 2018, at 17:55, Alexander Smirnov <al...@gmail.com>
>> wrote:
>>
>> Thanks for quick turnaround Stefan, Piotr
>>
>> This is a rare reproducible issue and I will keep an eye on it
>>
>> searching on the Stack Overflow I found
>> https://stackoverflow.com/questions/43378664/kafka-leader-election-causes-kafka-streams-crash
>>
>> They say that the problem is fixed in 0.10.2.1 of kafka producer so I
>> wonder which version is used in FlinkKafkaProducer integration. For earlier
>> versions it is proposed to use configuration:
>>
>> final Properties props = new Properties();...
>> props.put(ProducerConfig.RETRIES_CONFIG, 10);
>> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000);
>>
>>
>>
>>
>> On Fri, May 4, 2018 at 4:58 PM Piotr Nowojski <pi...@data-artisans.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I think Stefan is right. Quick google search points to this:
>>> https://stackoverflow.com/questions/47767169/kafka-this-server-is-not-the-leader-for-that-topic-partition
>>>
>>> Please let us know if changing your configuration will solve the problem!
>>>
>>> Piotrek
>>>
>>> On 4 May 2018, at 15:53, Stefan Richter <s....@data-artisans.com>
>>> wrote:
>>>
>>> Hi,
>>>
>>> I think in general this means that your producer client does not connect
>>> to the correct Broker (the leader) but to a broker that is just a follower
>>> and the follower can not execute that request. However, I am not sure what
>>> causes this in the context of the FlinkKafkaProducer. Maybe Piotr (in CC)
>>> has an idea?
>>>
>>> Best,
>>> Stefan
>>>
>>> Am 04.05.2018 um 15:45 schrieb Alexander Smirnov <
>>> alexander.smirnoff@gmail.com>:
>>>
>>> Hi,
>>>
>>> what could cause the following exception?
>>>
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception:
>>> Failed to send data to Kafka: This server is not the leader for that
>>> topic-partition.
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:999)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:614)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:93)
>>> at
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:219)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
>>> at
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>> at
>>> com.alexander.smirnov.FilterBMFunction.flatMap(FilterBMFunction.java:162)
>>>
>>>
>>> Thank you,
>>> Alex
>>>
>>>
>>>
>>>
>>
>

Re: This server is not the leader for that topic-partition

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

Regardless if that will fix the problem or not, please consider upgrading to Kafka 0.11.0.2 or 1.0.1. Kafka 0.11.0 release was quite messy and it might be that the bug you have hit was fixed in 0.11.0.2.

As a side note, as far as we know our FlinkKafkaProducer011 works fine with Kafka 1.0.x.

Piotrek

> On 7 May 2018, at 12:12, Alexander Smirnov <al...@gmail.com> wrote:
> 
> Hi Piotr, using 0.11.0 Kafka version
> 
> On Sat, May 5, 2018 at 10:19 AM Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> wrote:
> FlinkKafka011Producer uses Kafka 0.11.0.2. 
> 
> However I’m not sure if bumping KafkaProducer version solves this issue or upgrading Kafka. What Kafka version are you using?
> 
> Piotrek
> 
> 
>> On 4 May 2018, at 17:55, Alexander Smirnov <alexander.smirnoff@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Thanks for quick turnaround Stefan, Piotr
>> 
>> This is a rare reproducible issue and I will keep an eye on it
>> 
>> searching on the Stack Overflow I found https://stackoverflow.com/questions/43378664/kafka-leader-election-causes-kafka-streams-crash <https://stackoverflow.com/questions/43378664/kafka-leader-election-causes-kafka-streams-crash>
>> 
>> They say that the problem is fixed in 0.10.2.1 of kafka producer so I wonder which version is used in FlinkKafkaProducer integration. For earlier versions it is proposed to use configuration:
>> 
>> final Properties props = new Properties();
>> ...
>> props.put(ProducerConfig.RETRIES_CONFIG, 10);  
>> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
>> props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000);
>> 
>> 
>> 
>> On Fri, May 4, 2018 at 4:58 PM Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> wrote:
>> Hi,
>> 
>> I think Stefan is right. Quick google search points to this: https://stackoverflow.com/questions/47767169/kafka-this-server-is-not-the-leader-for-that-topic-partition <https://stackoverflow.com/questions/47767169/kafka-this-server-is-not-the-leader-for-that-topic-partition>
>> 
>> Please let us know if changing your configuration will solve the problem!
>> 
>> Piotrek
>> 
>>> On 4 May 2018, at 15:53, Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> I think in general this means that your producer client does not connect to the correct Broker (the leader) but to a broker that is just a follower and the follower can not execute that request. However, I am not sure what causes this in the context of the FlinkKafkaProducer. Maybe Piotr (in CC) has an idea?
>>> 
>>> Best,
>>> Stefan
>>> 
>>>> Am 04.05.2018 um 15:45 schrieb Alexander Smirnov <alexander.smirnoff@gmail.com <ma...@gmail.com>>:
>>>> 
>>>> Hi,
>>>> 
>>>> what could cause the following exception?
>>>> 
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed to send data to Kafka: This server is not the leader for that topic-partition.
>>>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:999)
>>>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:614)
>>>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:93)
>>>> 	at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:219)
>>>> 	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
>>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
>>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
>>>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
>>>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
>>>> 	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>> 	at com.alexander.smirnov.FilterBMFunction.flatMap(FilterBMFunction.java:162)
>>>> 
>>>> 
>>>> Thank you,
>>>> Alex
>>> 
>> 
> 


Re: This server is not the leader for that topic-partition

Posted by Alexander Smirnov <al...@gmail.com>.
Hi Piotr, using 0.11.0 Kafka version

On Sat, May 5, 2018 at 10:19 AM Piotr Nowojski <pi...@data-artisans.com>
wrote:

> FlinkKafka011Producer uses Kafka 0.11.0.2.
>
> However I’m not sure if bumping KafkaProducer version solves this issue or
> upgrading Kafka. What Kafka version are you using?
>
> Piotrek
>
>
> On 4 May 2018, at 17:55, Alexander Smirnov <al...@gmail.com>
> wrote:
>
> Thanks for quick turnaround Stefan, Piotr
>
> This is a rare reproducible issue and I will keep an eye on it
>
> searching on the Stack Overflow I found
> https://stackoverflow.com/questions/43378664/kafka-leader-election-causes-kafka-streams-crash
>
> They say that the problem is fixed in 0.10.2.1 of kafka producer so I
> wonder which version is used in FlinkKafkaProducer integration. For earlier
> versions it is proposed to use configuration:
>
> final Properties props = new Properties();...
> props.put(ProducerConfig.RETRIES_CONFIG, 10);
> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000);
>
>
>
>
> On Fri, May 4, 2018 at 4:58 PM Piotr Nowojski <pi...@data-artisans.com>
> wrote:
>
>> Hi,
>>
>> I think Stefan is right. Quick google search points to this:
>> https://stackoverflow.com/questions/47767169/kafka-this-server-is-not-the-leader-for-that-topic-partition
>>
>> Please let us know if changing your configuration will solve the problem!
>>
>> Piotrek
>>
>> On 4 May 2018, at 15:53, Stefan Richter <s....@data-artisans.com>
>> wrote:
>>
>> Hi,
>>
>> I think in general this means that your producer client does not connect
>> to the correct Broker (the leader) but to a broker that is just a follower
>> and the follower can not execute that request. However, I am not sure what
>> causes this in the context of the FlinkKafkaProducer. Maybe Piotr (in CC)
>> has an idea?
>>
>> Best,
>> Stefan
>>
>> Am 04.05.2018 um 15:45 schrieb Alexander Smirnov <
>> alexander.smirnoff@gmail.com>:
>>
>> Hi,
>>
>> what could cause the following exception?
>>
>> org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception:
>> Failed to send data to Kafka: This server is not the leader for that
>> topic-partition.
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:999)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:614)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:93)
>> at
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:219)
>> at
>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
>> at
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>> at
>> com.alexander.smirnov.FilterBMFunction.flatMap(FilterBMFunction.java:162)
>>
>>
>> Thank you,
>> Alex
>>
>>
>>
>>
>

Re: This server is not the leader for that topic-partition

Posted by Piotr Nowojski <pi...@data-artisans.com>.
FlinkKafka011Producer uses Kafka 0.11.0.2. 

However I’m not sure if bumping KafkaProducer version solves this issue or upgrading Kafka. What Kafka version are you using?

Piotrek

> On 4 May 2018, at 17:55, Alexander Smirnov <al...@gmail.com> wrote:
> 
> Thanks for quick turnaround Stefan, Piotr
> 
> This is a rare reproducible issue and I will keep an eye on it
> 
> searching on the Stack Overflow I found https://stackoverflow.com/questions/43378664/kafka-leader-election-causes-kafka-streams-crash <https://stackoverflow.com/questions/43378664/kafka-leader-election-causes-kafka-streams-crash>
> 
> They say that the problem is fixed in 0.10.2.1 of kafka producer so I wonder which version is used in FlinkKafkaProducer integration. For earlier versions it is proposed to use configuration:
> 
> final Properties props = new Properties();
> ...
> props.put(ProducerConfig.RETRIES_CONFIG, 10);  
> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
> props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000);
> 
> 
> 
> On Fri, May 4, 2018 at 4:58 PM Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> wrote:
> Hi,
> 
> I think Stefan is right. Quick google search points to this: https://stackoverflow.com/questions/47767169/kafka-this-server-is-not-the-leader-for-that-topic-partition <https://stackoverflow.com/questions/47767169/kafka-this-server-is-not-the-leader-for-that-topic-partition>
> 
> Please let us know if changing your configuration will solve the problem!
> 
> Piotrek
> 
>> On 4 May 2018, at 15:53, Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>> wrote:
>> 
>> Hi,
>> 
>> I think in general this means that your producer client does not connect to the correct Broker (the leader) but to a broker that is just a follower and the follower can not execute that request. However, I am not sure what causes this in the context of the FlinkKafkaProducer. Maybe Piotr (in CC) has an idea?
>> 
>> Best,
>> Stefan
>> 
>>> Am 04.05.2018 um 15:45 schrieb Alexander Smirnov <alexander.smirnoff@gmail.com <ma...@gmail.com>>:
>>> 
>>> Hi,
>>> 
>>> what could cause the following exception?
>>> 
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed to send data to Kafka: This server is not the leader for that topic-partition.
>>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:999)
>>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:614)
>>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:93)
>>> 	at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:219)
>>> 	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
>>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
>>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
>>> 	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>> 	at com.alexander.smirnov.FilterBMFunction.flatMap(FilterBMFunction.java:162)
>>> 
>>> 
>>> Thank you,
>>> Alex
>> 
> 


Re: This server is not the leader for that topic-partition

Posted by Alexander Smirnov <al...@gmail.com>.
Thanks for quick turnaround Stefan, Piotr

This is a rare reproducible issue and I will keep an eye on it

searching on the Stack Overflow I found
https://stackoverflow.com/questions/43378664/kafka-leader-election-causes-kafka-streams-crash

They say that the problem is fixed in 0.10.2.1 of kafka producer so I
wonder which version is used in FlinkKafkaProducer integration. For earlier
versions it is proposed to use configuration:

final Properties props = new Properties();...
props.put(ProducerConfig.RETRIES_CONFIG, 10);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
Integer.toString(Integer.MAX_VALUE));props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
20000);




On Fri, May 4, 2018 at 4:58 PM Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> I think Stefan is right. Quick google search points to this:
> https://stackoverflow.com/questions/47767169/kafka-this-server-is-not-the-leader-for-that-topic-partition
>
> Please let us know if changing your configuration will solve the problem!
>
> Piotrek
>
> On 4 May 2018, at 15:53, Stefan Richter <s....@data-artisans.com>
> wrote:
>
> Hi,
>
> I think in general this means that your producer client does not connect
> to the correct Broker (the leader) but to a broker that is just a follower
> and the follower can not execute that request. However, I am not sure what
> causes this in the context of the FlinkKafkaProducer. Maybe Piotr (in CC)
> has an idea?
>
> Best,
> Stefan
>
> Am 04.05.2018 um 15:45 schrieb Alexander Smirnov <
> alexander.smirnoff@gmail.com>:
>
> Hi,
>
> what could cause the following exception?
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed
> to send data to Kafka: This server is not the leader for that
> topic-partition.
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:999)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:614)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:93)
> at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:219)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> com.alexander.smirnov.FilterBMFunction.flatMap(FilterBMFunction.java:162)
>
>
> Thank you,
> Alex
>
>
>
>

Re: This server is not the leader for that topic-partition

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

I think Stefan is right. Quick google search points to this: https://stackoverflow.com/questions/47767169/kafka-this-server-is-not-the-leader-for-that-topic-partition <https://stackoverflow.com/questions/47767169/kafka-this-server-is-not-the-leader-for-that-topic-partition>

Please let us know if changing your configuration will solve the problem!

Piotrek

> On 4 May 2018, at 15:53, Stefan Richter <s....@data-artisans.com> wrote:
> 
> Hi,
> 
> I think in general this means that your producer client does not connect to the correct Broker (the leader) but to a broker that is just a follower and the follower can not execute that request. However, I am not sure what causes this in the context of the FlinkKafkaProducer. Maybe Piotr (in CC) has an idea?
> 
> Best,
> Stefan
> 
>> Am 04.05.2018 um 15:45 schrieb Alexander Smirnov <al...@gmail.com>:
>> 
>> Hi,
>> 
>> what could cause the following exception?
>> 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed to send data to Kafka: This server is not the leader for that topic-partition.
>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:999)
>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:614)
>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:93)
>> 	at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:219)
>> 	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
>> 	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>> 	at com.alexander.smirnov.FilterBMFunction.flatMap(FilterBMFunction.java:162)
>> 
>> 
>> Thank you,
>> Alex
> 


Re: This server is not the leader for that topic-partition

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

I think in general this means that your producer client does not connect to the correct Broker (the leader) but to a broker that is just a follower and the follower can not execute that request. However, I am not sure what causes this in the context of the FlinkKafkaProducer. Maybe Piotr (in CC) has an idea?

Best,
Stefan

> Am 04.05.2018 um 15:45 schrieb Alexander Smirnov <al...@gmail.com>:
> 
> Hi,
> 
> what could cause the following exception?
> 
> org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed to send data to Kafka: This server is not the leader for that topic-partition.
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:999)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:614)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:93)
> 	at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:219)
> 	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
> 	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> 	at com.alexander.smirnov.FilterBMFunction.flatMap(FilterBMFunction.java:162)
> 
> 
> Thank you,
> Alex