You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "DONG, Weike" <ky...@connect.hku.hk> on 2020/09/03 12:15:08 UTC

Bug: Kafka producer always writes to partition 0, because KafkaSerializationSchemaWrapper does not call open() method of FlinkKafkaPartitioner

Hi community,

We have found a serious issue with the newly-introduced
*KafkaSerializationSchemaWrapper
*class, which eventually let *FlinkKafkaProducer *only write to partition 0
in the given Kafka topic under certain conditions.

First let's look at this constructor in the universal version of
*FlinkKafkaProducer*, and it uses FlinkFixedPartitioner as the custom
partitioner.

[image: image.png]

And when we trace down the call path, *KafkaSerializationSchemaWrapper *is
used to wrap the aforementioned custom partitioner, i.e.
*FlinkFiexedPartitioner*.

[image: image.png]

However, we found that in the implementation of
*KafkaSerializationSchemaWrapper*, it does not call the *open *method of
the given partitioner, which is essential for the partitioner to initialize
its environment variables like *parallelInstanceId *in
*FlinkFixedPartitioner*.

Therefore, when *KafkaSerializationSchemaWrapper#serialize* is later called
by the FlinkKafkaProducer,   *FlinkFiexedPartitioner#partition* would
always return 0, because  *parallelInstanceId *is not properly initialized.
[image: image.png]

Eventually, all of the data would go only to partition 0 of the given Kafka
topic, creating severe data skew in the sink.

Re: Bug: Kafka producer always writes to partition 0, because KafkaSerializationSchemaWrapper does not call open() method of FlinkKafkaPartitioner

Posted by Ken Krugler <kk...@transpac.com>.
Assuming you’re not doing custom partitioning, then another workaround is to pass Optional.empty() for the partitioner, so that it will use the Kafka partitioning vs. a Flink partitioner.

Or at least that worked for us, when we encountered this same issue.

— Ken

> On Sep 3, 2020, at 5:36 AM, Dawid Wysakowicz <dw...@apache.org> wrote:
> 
> Thank you for the thorough investigation.  I totally agree with you. I created an issue for it[1]. Will try to fix it as soon as possible and include it in 1.11.2 and 1.12. 
> The way you could work this around is by using the KafkaSerializationSchema directly with an KafkaContextAware interface.
> Best,
> 
> Dawid
> 
> [1] https://issues.apache.org/jira/browse/FLINK-19133 <https://issues.apache.org/jira/browse/FLINK-19133>
> On 03/09/2020 14:24, DONG, Weike wrote:
>> Hi community,
>> 
>> And by the way, during FlinkKafkaProducer#initProducer, the flinkKafkaPartitioner is only opened when is is NOT null, which is unfortunately not the case here, because it would be set to null if KafkaSerializationSchemaWrapper is provided in the arguments of the constructor.
>> 
>> <image.png>
>> <image.png>
>> 
>> So these logic flaws eventually lead to this serious bug, and we recommend that initialization of FlinkKafkaPartitioners could be done in KafkaSerializationSchemaWrapper#open.
>> 
>> Sincerely,
>> Weike
>> 
>> 
>> On Thu, Sep 3, 2020 at 8:15 PM DONG, Weike <kyledong@connect.hku.hk <ma...@connect.hku.hk>> wrote:
>> Hi community,
>> 
>> We have found a serious issue with the newly-introduced KafkaSerializationSchemaWrapper class, which eventually let FlinkKafkaProducer only write to partition 0 in the given Kafka topic under certain conditions.
>> 
>> First let's look at this constructor in the universal version of FlinkKafkaProducer, and it uses FlinkFixedPartitioner as the custom partitioner.
>> 
>> <image.png>
>> 
>> And when we trace down the call path, KafkaSerializationSchemaWrapper is used to wrap the aforementioned custom partitioner, i.e. FlinkFiexedPartitioner. 
>> 
>> <image.png>
>> 
>> However, we found that in the implementation of  KafkaSerializationSchemaWrapper, it does not call the open method of the given partitioner, which is essential for the partitioner to initialize its environment variables like parallelInstanceId in FlinkFixedPartitioner. 
>> 
>> Therefore, when KafkaSerializationSchemaWrapper#serialize is later called by the FlinkKafkaProducer,   FlinkFiexedPartitioner#partition would always return 0, because  parallelInstanceId is not properly initialized.
>> <image.png>
>> 
>> Eventually, all of the data would go only to partition 0 of the given Kafka topic, creating severe data skew in the sink.
>> 
>> 
>> 

--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr


Re: Bug: Kafka producer always writes to partition 0, because KafkaSerializationSchemaWrapper does not call open() method of FlinkKafkaPartitioner

Posted by Dawid Wysakowicz <dw...@apache.org>.
Thank you for the thorough investigation.  I totally agree with you. I
created an issue for it[1]. Will try to fix it as soon as possible and
include it in 1.11.2 and 1.12.

The way you could work this around is by using the
KafkaSerializationSchema directly with an KafkaContextAware interface.

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-19133

On 03/09/2020 14:24, DONG, Weike wrote:
> Hi community,
>
> And by the way, during /FlinkKafkaProducer#initProducer/,
> the /flinkKafkaPartitioner/ is only opened when is is NOT null, which
> is unfortunately not the case here, because it would be set to null
> if /KafkaSerializationSchemaWrapper /is provided in the arguments of
> the constructor.
>
> image.png
> image.png
>
> So these logic flaws eventually lead to this serious bug, and we
> recommend that initialization of FlinkKafkaPartitioners could be done
> in KafkaSerializationSchemaWrapper#open.
>
> Sincerely,
> Weike
>
>
> On Thu, Sep 3, 2020 at 8:15 PM DONG, Weike <kyledong@connect.hku.hk
> <ma...@connect.hku.hk>> wrote:
>
>     Hi community,
>
>     We have found a serious issue with the
>     newly-introduced /KafkaSerializationSchemaWrapper /class, which
>     eventually let /FlinkKafkaProducer /only write to partition 0 in
>     the given Kafka topic under certain conditions.
>
>     First let's look at this constructor in the universal version of
>     /FlinkKafkaProducer/, and it uses FlinkFixedPartitioner as the
>     custom partitioner.
>
>     image.png
>
>     And when we trace down the call path,
>     /KafkaSerializationSchemaWrapper /is used to wrap the
>     aforementioned custom partitioner, i.e. /FlinkFiexedPartitioner/. 
>
>     image.png
>
>     However, we found that in the implementation of 
>     /KafkaSerializationSchemaWrapper/, it does not call the /open
>     /method of the given partitioner, which is essential for the
>     partitioner to initialize its environment variables
>     like /parallelInstanceId /in /FlinkFixedPartitioner/. 
>
>     Therefore, when /KafkaSerializationSchemaWrapper#serialize/ is
>     later called by the FlinkKafkaProducer,  
>     /FlinkFiexedPartitioner#partition/ would always return 0, because 
>     /parallelInstanceId /is not properly initialized.
>     image.png
>
>     Eventually, all of the data would go only to partition 0 of the
>     given Kafka topic, creating severe data skew in the sink.
>
>
>

Re: Bug: Kafka producer always writes to partition 0, because KafkaSerializationSchemaWrapper does not call open() method of FlinkKafkaPartitioner

Posted by Dawid Wysakowicz <dw...@apache.org>.
Thank you for the thorough investigation.  I totally agree with you. I
created an issue for it[1]. Will try to fix it as soon as possible and
include it in 1.11.2 and 1.12.

The way you could work this around is by using the
KafkaSerializationSchema directly with an KafkaContextAware interface.

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-19133

On 03/09/2020 14:24, DONG, Weike wrote:
> Hi community,
>
> And by the way, during /FlinkKafkaProducer#initProducer/,
> the /flinkKafkaPartitioner/ is only opened when is is NOT null, which
> is unfortunately not the case here, because it would be set to null
> if /KafkaSerializationSchemaWrapper /is provided in the arguments of
> the constructor.
>
> image.png
> image.png
>
> So these logic flaws eventually lead to this serious bug, and we
> recommend that initialization of FlinkKafkaPartitioners could be done
> in KafkaSerializationSchemaWrapper#open.
>
> Sincerely,
> Weike
>
>
> On Thu, Sep 3, 2020 at 8:15 PM DONG, Weike <kyledong@connect.hku.hk
> <ma...@connect.hku.hk>> wrote:
>
>     Hi community,
>
>     We have found a serious issue with the
>     newly-introduced /KafkaSerializationSchemaWrapper /class, which
>     eventually let /FlinkKafkaProducer /only write to partition 0 in
>     the given Kafka topic under certain conditions.
>
>     First let's look at this constructor in the universal version of
>     /FlinkKafkaProducer/, and it uses FlinkFixedPartitioner as the
>     custom partitioner.
>
>     image.png
>
>     And when we trace down the call path,
>     /KafkaSerializationSchemaWrapper /is used to wrap the
>     aforementioned custom partitioner, i.e. /FlinkFiexedPartitioner/. 
>
>     image.png
>
>     However, we found that in the implementation of 
>     /KafkaSerializationSchemaWrapper/, it does not call the /open
>     /method of the given partitioner, which is essential for the
>     partitioner to initialize its environment variables
>     like /parallelInstanceId /in /FlinkFixedPartitioner/. 
>
>     Therefore, when /KafkaSerializationSchemaWrapper#serialize/ is
>     later called by the FlinkKafkaProducer,  
>     /FlinkFiexedPartitioner#partition/ would always return 0, because 
>     /parallelInstanceId /is not properly initialized.
>     image.png
>
>     Eventually, all of the data would go only to partition 0 of the
>     given Kafka topic, creating severe data skew in the sink.
>
>
>

Re: Bug: Kafka producer always writes to partition 0, because KafkaSerializationSchemaWrapper does not call open() method of FlinkKafkaPartitioner

Posted by "DONG, Weike" <ky...@connect.hku.hk>.
Hi community,

And by the way, during *FlinkKafkaProducer#initProducer*, the
*flinkKafkaPartitioner* is only opened when is is NOT null, which is
unfortunately not the case here, because it would be set to null if
*KafkaSerializationSchemaWrapper
*is provided in the arguments of the constructor.

[image: image.png]
[image: image.png]

So these logic flaws eventually lead to this serious bug, and we recommend
that initialization of FlinkKafkaPartitioners could be done in
KafkaSerializationSchemaWrapper#open.

Sincerely,
Weike


On Thu, Sep 3, 2020 at 8:15 PM DONG, Weike <ky...@connect.hku.hk> wrote:

> Hi community,
>
> We have found a serious issue with the newly-introduced *KafkaSerializationSchemaWrapper
> *class, which eventually let *FlinkKafkaProducer *only write to partition
> 0 in the given Kafka topic under certain conditions.
>
> First let's look at this constructor in the universal version of
> *FlinkKafkaProducer*, and it uses FlinkFixedPartitioner as the custom
> partitioner.
>
> [image: image.png]
>
> And when we trace down the call path, *KafkaSerializationSchemaWrapper *is
> used to wrap the aforementioned custom partitioner, i.e.
> *FlinkFiexedPartitioner*.
>
> [image: image.png]
>
> However, we found that in the implementation of
> *KafkaSerializationSchemaWrapper*, it does not call the *open *method of
> the given partitioner, which is essential for the partitioner to initialize
> its environment variables like *parallelInstanceId *in
> *FlinkFixedPartitioner*.
>
> Therefore, when *KafkaSerializationSchemaWrapper#serialize* is later
> called by the FlinkKafkaProducer,   *FlinkFiexedPartitioner#partition*
> would always return 0, because  *parallelInstanceId *is not properly
> initialized.
> [image: image.png]
>
> Eventually, all of the data would go only to partition 0 of the given
> Kafka topic, creating severe data skew in the sink.
>
>
>
>

Re: Bug: Kafka producer always writes to partition 0, because KafkaSerializationSchemaWrapper does not call open() method of FlinkKafkaPartitioner

Posted by "DONG, Weike" <ky...@connect.hku.hk>.
Hi community,

And by the way, during *FlinkKafkaProducer#initProducer*, the
*flinkKafkaPartitioner* is only opened when is is NOT null, which is
unfortunately not the case here, because it would be set to null if
*KafkaSerializationSchemaWrapper
*is provided in the arguments of the constructor.

[image: image.png]
[image: image.png]

So these logic flaws eventually lead to this serious bug, and we recommend
that initialization of FlinkKafkaPartitioners could be done in
KafkaSerializationSchemaWrapper#open.

Sincerely,
Weike


On Thu, Sep 3, 2020 at 8:15 PM DONG, Weike <ky...@connect.hku.hk> wrote:

> Hi community,
>
> We have found a serious issue with the newly-introduced *KafkaSerializationSchemaWrapper
> *class, which eventually let *FlinkKafkaProducer *only write to partition
> 0 in the given Kafka topic under certain conditions.
>
> First let's look at this constructor in the universal version of
> *FlinkKafkaProducer*, and it uses FlinkFixedPartitioner as the custom
> partitioner.
>
> [image: image.png]
>
> And when we trace down the call path, *KafkaSerializationSchemaWrapper *is
> used to wrap the aforementioned custom partitioner, i.e.
> *FlinkFiexedPartitioner*.
>
> [image: image.png]
>
> However, we found that in the implementation of
> *KafkaSerializationSchemaWrapper*, it does not call the *open *method of
> the given partitioner, which is essential for the partitioner to initialize
> its environment variables like *parallelInstanceId *in
> *FlinkFixedPartitioner*.
>
> Therefore, when *KafkaSerializationSchemaWrapper#serialize* is later
> called by the FlinkKafkaProducer,   *FlinkFiexedPartitioner#partition*
> would always return 0, because  *parallelInstanceId *is not properly
> initialized.
> [image: image.png]
>
> Eventually, all of the data would go only to partition 0 of the given
> Kafka topic, creating severe data skew in the sink.
>
>
>
>