You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rafi Aroch <ra...@gmail.com> on 2018/05/24 07:40:37 UTC

FlinkKinesisProducer weird behaviour

Hi,

We're using Kinesis as our input & output of a job and experiencing parsing
exception while reading from the output stream. All streams contain 1 shard
only.

While investigating the issue I noticed a weird behaviour where records get
a PartitionKey I did not assign and the record Data is being wrapped with
random illegal chars.

I wrote a very basic program to try to isolate the problem, but still I see
this happening:

   - I wrote a simple SourceFunction which generates messages of the
   pattern - <sequence#>-AAAAAAAAAAA\n
   - FlinkKinesisProducer writes the messages the Kinesis stream with a
   default partitionKey of "0" - so I expect ALL records to have partitionKey
   of "0"

To verify the records in the Kinesis stream I use AWS CLI get-records API
and see the following:

.......................
>         {
>             "SequenceNumber": "495847358735091222729264256267
> 45413182361252610143420418",
>             "ApproximateArrivalTimestamp": 1527144766.662,
>             "Data": "
> 84mawgoBMBpsCAAaaDc5LUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUEKGmwIABpoODAtQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQQodBmhDDIwmRVeomHOIGlWJ
> ",
>
> *            "PartitionKey": "a"*        },
>         {
>             "SequenceNumber": "495847358735091222729264256267
> 46622108180867308037603330",
>             "ApproximateArrivalTimestamp": 1527144766.86,
>             "Data": "QUFBQUFBQUFBQUFBQUFBQUFBQUFBQU
> FBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQU
> FBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQQo=",
>
> *            "PartitionKey": "0"*        },
> .......................


Where did PartitionKey "a" come from?

Further more, if you Base64 decode the record data of the records you see
that all records written with this PartitionKey "a" are wrapped with weird
illegal characters.
For example:

$ echo
> 84mawgoBMBpsCAAaaDc5LUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUEKGmwIABpoODAtQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQQodBmhDDIwmRVeomHOIGlWJ
> | base64 --decode
> ��
>
> 0h79-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
>
> h80-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
> hC
>   �&EW��s�U�r


While the records with PartitionKey "0" look good:

$ echo
> ODEtQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQQo=
> | base64 --decode
>
> 81-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA



I tried using both 1.4.2 version & 1.6-SNAPSHOT and still see the issue...

Here is a link to the gist:
https://gist.github.com/aroch/7fb4219e7ada74f30654f1effe9d2f43

Am I missing anything? Has anyone encountered such issue?

Would appreciate any help,

Rafi

Re: FlinkKinesisProducer weird behaviour

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

I’m glad that you have figured it out.

Unfortunately it’s almost impossible to mention in our documentation all of the quirks of connectors that we are using, since it would more or less finally come down to fully coping their documentation :( However I created a small PR that mentions this issue:

https://github.com/apache/flink/pull/6072 <https://github.com/apache/flink/pull/6072>

Please feel free to make further comments/suggestions there

Thanks, Piotrek

> On 24 May 2018, at 14:35, Rafi Aroch <ra...@gmail.com> wrote:
> 
> Hi,
> 
> Thanks Piotr for your response.
> 
> I've further investigated the issue and found the root cause.
> 
> There are 2 possible ways to produce/consume records to/from Kinesis:
> Using the Kinesis Data Streams service API directly
> Using the KCL & KPL.
> The FlinkKinesisProducer uses the AWS KPL to push records into Kinesis, for optimized performance. One of the features of the KPL is Aggregation, meaning that it batches many UserRecords into one Kinesis Record to increase producer throughput.
> The thing is, that consumers of that stream needs to be aware that the records being consumed are aggregated and handle it accordingly [1][2].
> 
> In my case, the output stream is being consumed by Druid <http://druid.io/>. So the consumer code is not in my control...
> So my choices are to disable the Aggregation feature by passing aggregationEnable: false in the kinesis configuration or writing my own custom consumer for Druid.
> 
> I think that we should state this as part of the documentation for Flink Kinesis Connector.
> 
> [1] https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html <https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html>
> [2] https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-integration.html <https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-integration.html>
> 
> Thanks,
> Rafi
> 
> On Thu, May 24, 2018 at 11:18 AM, Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> wrote:
> Hi,
> 
> Have you tried to write the same records, with exactly the same configuration to the Kinesis, but outside of Flink (with some standalone Java application)?
> 
> Piotrek
> 
> 
>> On 24 May 2018, at 09:40, Rafi Aroch <rafi.aroch@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi,
>> 
>> We're using Kinesis as our input & output of a job and experiencing parsing exception while reading from the output stream. All streams contain 1 shard only.
>> 
>> While investigating the issue I noticed a weird behaviour where records get a PartitionKey I did not assign and the record Data is being wrapped with random illegal chars.
>> 
>> I wrote a very basic program to try to isolate the problem, but still I see this happening:
>> I wrote a simple SourceFunction which generates messages of the pattern - <sequence#>-AAAAAAAAAAA\n
>> FlinkKinesisProducer writes the messages the Kinesis stream with a default partitionKey of "0" - so I expect ALL records to have partitionKey of "0"
>> To verify the records in the Kinesis stream I use AWS CLI get-records API and see the following:
>> 
>> .......................
>>         {
>>             "SequenceNumber": "49584735873509122272926425626745413182361252610143420418",
>>             "ApproximateArrivalTimestamp": 1527144766.662,
>>             "Data": "84mawgoBMBpsCAAaaDc5LUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUEKGmwIABpoODAtQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQQodBmhDDIwmRVeomHOIGlWJ",
>>             "PartitionKey": "a"
>>         },
>>         {
>>             "SequenceNumber": "49584735873509122272926425626746622108180867308037603330",
>>             "ApproximateArrivalTimestamp": 1527144766.86,
>>             "Data": "QUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQQo=",
>>             "PartitionKey": "0"
>>         },
>> .......................
>> 
>> Where did PartitionKey "a" come from?
>> 
>> Further more, if you Base64 decode the record data of the records you see that all records written with this PartitionKey "a" are wrapped with weird illegal characters.
>> For example:
>> 
>> $ echo 84mawgoBMBpsCAAaaDc5LUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUEKGmwIABpoODAtQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQQodBmhDDIwmRVeomHOIGlWJ | base64 --decode
>> ��
>> 0h79-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
>> h80-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
>> hC
>>   �&EW��s�U�r
>> 
>> While the records with PartitionKey "0" look good:
>> 
>> $ echo ODEtQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQQo= | base64 --decode
>> 81-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
>> 
>> 
>> I tried using both 1.4.2 version & 1.6-SNAPSHOT and still see the issue...
>> 
>> Here is a link to the gist: https://gist.github.com/aroch/7fb4219e7ada74f30654f1effe9d2f43 <https://gist.github.com/aroch/7fb4219e7ada74f30654f1effe9d2f43>
>> 
>> Am I missing anything? Has anyone encountered such issue?
>> 
>> Would appreciate any help,
>> 
>> Rafi
> 
> 


Re: FlinkKinesisProducer weird behaviour

Posted by Rafi Aroch <ra...@gmail.com>.
Hi,

Thanks Piotr for your response.

I've further investigated the issue and found the root cause.

There are 2 possible ways to produce/consume records to/from Kinesis:

   1. Using the Kinesis Data Streams service API directly
   2. Using the KCL & KPL.

The FlinkKinesisProducer uses the AWS KPL to push records into Kinesis, for
optimized performance. One of the features of the KPL is Aggregation,
meaning that it batches many UserRecords into one Kinesis Record to
increase producer throughput.
The thing is, that consumers of that stream needs to be aware that the
records being consumed are aggregated and handle it accordingly [1][2].

In my case, the output stream is being consumed by Druid <http://druid.io/>.
So the consumer code is not in my control...
So my choices are to disable the Aggregation feature by passing
aggregationEnable: false in the kinesis configuration or writing my own
custom consumer for Druid.

I think that we should state this as part of the documentation for Flink
Kinesis Connector.

[1]
https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html
[2]
https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-integration.html

Thanks,
Rafi

On Thu, May 24, 2018 at 11:18 AM, Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> Have you tried to write the same records, with exactly the same
> configuration to the Kinesis, but outside of Flink (with some standalone
> Java application)?
>
> Piotrek
>
>
> On 24 May 2018, at 09:40, Rafi Aroch <ra...@gmail.com> wrote:
>
> Hi,
>
> We're using Kinesis as our input & output of a job and experiencing
> parsing exception while reading from the output stream. All streams contain
> 1 shard only.
>
> While investigating the issue I noticed a weird behaviour where records
> get a PartitionKey I did not assign and the record Data is being wrapped
> with random illegal chars.
>
> I wrote a very basic program to try to isolate the problem, but still I
> see this happening:
>
>    - I wrote a simple SourceFunction which generates messages of the
>    pattern - <sequence#>-AAAAAAAAAAA\n
>    - FlinkKinesisProducer writes the messages the Kinesis stream with a
>    default partitionKey of "0" - so I expect ALL records to have partitionKey
>    of "0"
>
> To verify the records in the Kinesis stream I use AWS CLI get-records API
> and see the following:
>
> .......................
>>         {
>>             "SequenceNumber": "49584735873509122272926425626
>> 745413182361252610143420418",
>>             "ApproximateArrivalTimestamp": 1527144766.662,
>>             "Data": "84mawgoBMBpsCAAaaDc5LUFBQUFBQ
>> UFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQ
>> UFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQ
>> UFBQUEKGmwIABpoODAtQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQ
>> UFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQ
>> UFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQQodBmhDDIwmRVeomHOIGlWJ",
>>
>> *            "PartitionKey": "a"*        },
>>         {
>>             "SequenceNumber": "49584735873509122272926425626
>> 746622108180867308037603330",
>>             "ApproximateArrivalTimestamp": 1527144766.86,
>>             "Data": "QUFBQUFBQUFBQUFBQUFBQUFBQUFBQ
>> UFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQ
>> UFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQQo=",
>>
>> *            "PartitionKey": "0"*        },
>> .......................
>
>
> Where did PartitionKey "a" come from?
>
> Further more, if you Base64 decode the record data of the records you see
> that all records written with this PartitionKey "a" are wrapped with
> weird illegal characters.
> For example:
>
> $ echo 84mawgoBMBpsCAAaaDc5LUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFB
>> QUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFB
>> QUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUEKGmwIABpoODAtQUFBQUFBQUFB
>> QUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFB
>> QUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQQodBmhDDIwmRVeomHOIGlWJ
>> | base64 --decode
>> ��
>> 0h79-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
>> AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
>> h80-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
>> AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
>> hC
>>   �&EW��s�U�r
>
>
> While the records with PartitionKey "0" look good:
>
> $ echo ODEtQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFB
>> QUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQQo=
>> | base64 --decode
>> 81-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
>> AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
>
>
>
> I tried using both 1.4.2 version & 1.6-SNAPSHOT and still see the issue...
>
> Here is a link to the gist: https://gist.github.com/aroch/
> 7fb4219e7ada74f30654f1effe9d2f43
>
> Am I missing anything? Has anyone encountered such issue?
>
> Would appreciate any help,
>
> Rafi
>
>
>

Re: FlinkKinesisProducer weird behaviour

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

Have you tried to write the same records, with exactly the same configuration to the Kinesis, but outside of Flink (with some standalone Java application)?

Piotrek

> On 24 May 2018, at 09:40, Rafi Aroch <ra...@gmail.com> wrote:
> 
> Hi,
> 
> We're using Kinesis as our input & output of a job and experiencing parsing exception while reading from the output stream. All streams contain 1 shard only.
> 
> While investigating the issue I noticed a weird behaviour where records get a PartitionKey I did not assign and the record Data is being wrapped with random illegal chars.
> 
> I wrote a very basic program to try to isolate the problem, but still I see this happening:
> I wrote a simple SourceFunction which generates messages of the pattern - <sequence#>-AAAAAAAAAAA\n
> FlinkKinesisProducer writes the messages the Kinesis stream with a default partitionKey of "0" - so I expect ALL records to have partitionKey of "0"
> To verify the records in the Kinesis stream I use AWS CLI get-records API and see the following:
> 
> .......................
>         {
>             "SequenceNumber": "49584735873509122272926425626745413182361252610143420418",
>             "ApproximateArrivalTimestamp": 1527144766.662,
>             "Data": "84mawgoBMBpsCAAaaDc5LUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUEKGmwIABpoODAtQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQQodBmhDDIwmRVeomHOIGlWJ",
>             "PartitionKey": "a"
>         },
>         {
>             "SequenceNumber": "49584735873509122272926425626746622108180867308037603330",
>             "ApproximateArrivalTimestamp": 1527144766.86,
>             "Data": "QUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQQo=",
>             "PartitionKey": "0"
>         },
> .......................
> 
> Where did PartitionKey "a" come from?
> 
> Further more, if you Base64 decode the record data of the records you see that all records written with this PartitionKey "a" are wrapped with weird illegal characters.
> For example:
> 
> $ echo 84mawgoBMBpsCAAaaDc5LUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUEKGmwIABpoODAtQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQQodBmhDDIwmRVeomHOIGlWJ | base64 --decode
> ��
> 0h79-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
> h80-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
> hC
>   �&EW��s�U�r
> 
> While the records with PartitionKey "0" look good:
> 
> $ echo ODEtQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQQo= | base64 --decode
> 81-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
> 
> 
> I tried using both 1.4.2 version & 1.6-SNAPSHOT and still see the issue...
> 
> Here is a link to the gist: https://gist.github.com/aroch/7fb4219e7ada74f30654f1effe9d2f43 <https://gist.github.com/aroch/7fb4219e7ada74f30654f1effe9d2f43>
> 
> Am I missing anything? Has anyone encountered such issue?
> 
> Would appreciate any help,
> 
> Rafi