You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Zain Haider Nemati <za...@retailo.co> on 2022/05/15 22:12:35 UTC

Kinesis Sink - Data being received with intermittent breaks

Hi,
Im fetching data from kafka topics converting them to chunks of <= 1MB and
sinking them to a kinesis data stream.
The streaming job is functional however I see bursts of data in kinesis
stream with intermittent dips where data received is 0. I'm attaching the
configuration parameters for kinesis sink. What could be the cause of this
issue?
The data is being fed into datastream by a kafka topic which is being fed
in by a mongodb and has about 60 million records which are loaded fully.
I am trying to configure parameters in such a way that the 1MB per data
payload limit of kinesis is not breached. Would appreciate help on this !

producerConfig.put(AWSConfigConstants.AWS_REGION, “xxx”);

producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, “xxx");

producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, “xxx”);
                    producerConfig.put(“AggregationMaxCount”, “3”);
                    producerConfig.put(“AggregationMaxSize”, “256”);
                    producerConfig.put(“CollectionMaxCount”, “3”);
                    producerConfig.put(“CollectionMaxSize”, “100000”);
                    producerConfig.put(“AggregationEnabled”, true);
                    producerConfig.put(“RateLimit”, “50");
                    producerConfig.put(“RecordMaxBufferedTime”, “1000");
                    producerConfig.put(“ThreadingModel”, “POOLED”);
                    FlinkKinesisProducer<String> kinesis = new
FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);
                        kinesis.setFailOnError(false);
                        kinesis.setDefaultStream(“xxx”);
                        kinesis.setDefaultPartition(“0");
                        kinesis.setQueueLimit(1000);

*Data in Kinesis :*
[image: image.png]

Re: Kinesis Sink - Data being received with intermittent breaks

Posted by Danny Cranmer <da...@apache.org>.
Hi Zain,

Glad you found the problem, good luck!

Thanks,
Danny Cranmer

On Fri, May 20, 2022 at 10:05 PM Zain Haider Nemati <za...@retailo.co>
wrote:

> Hi Danny,
> I looked into it in a bit more thorough detail, the bottleneck seems to be
> the transform function which is at 100% and causing back pressuring. Im
> looking into that.
> Thanks for your help, much appreciated !
>
> On Fri, May 20, 2022 at 1:24 AM Ber, Jeremy <jd...@amazon.com> wrote:
>
>> Hi Zain—
>>
>>
>>
>> Are you seeing any data loss present within the Flink Dashboard subtasks
>> of each task? On the bottom of your dashboard you should see data going
>> from each blue box to the next. Is this a comprehensive set of data?
>> Meaning do you see 80M from the source -> first operator -> second operator
>> -> sink?
>>
>>
>>
>> Secondly, it may be easier to troubleshoot this by removing a few
>> variables. Would you be able to remove the operator which segregates your
>> data into 100 length records and simply forward that data to the next
>> operator? Simultaneously, could you leave the Kinesis Producer
>> configuration settings (apart from queue limit) at their defaults? This
>> will give a good baseline from which to improve upon.
>>
>>
>>
>> Jeremy
>>
>>
>>
>> *From: *Zain Haider Nemati <za...@retailo.co>
>> *Date: *Wednesday, May 18, 2022 at 6:15 AM
>> *To: *Danny Cranmer <da...@apache.org>
>> *Cc: *Alexander Preuß <al...@ververica.com>, user <
>> user@flink.apache.org>
>> *Subject: *RE: [EXTERNAL]Kinesis Sink - Data being received with
>> intermittent breaks
>>
>>
>>
>> *CAUTION*: This email originated from outside of the organization. Do
>> not click links or open attachments unless you can confirm the sender and
>> know the content is safe.
>>
>>
>>
>> Hey Danny,
>>
>> Thanks for getting back to me.
>>
>> - You are seeing bursty throughput, but the job is keeping up? There is
>> no backpressure? --> Correct I'm not seeing any backpressure in any of the
>> metrics
>>
>> - What is the throughput at the sink? --> num of records out -- 1100 per
>> 10 seconds
>>
>> - On the graph screenshot, what is the period and stat (sum/average/etc)?
>> -->It is incoming data (MB/s) each second
>>
>>
>>
>> So let me explain this in totality, the number of records residing in the
>> source are about 80 million and the number of records i see in the kinesis
>> data stream after it has consumed the data from source is about 20 million
>> so im seeing alot of data loss and I think this potentially has to do with
>> the intermediate dips im seeing in the records coming in the data stream.
>>
>>
>>
>> What are the configurations you guys generally suggest for data of this
>> range and sinking to a kinesis data stream?
>>
>>
>>
>> On Wed, May 18, 2022 at 2:00 AM Danny Cranmer <da...@apache.org>
>> wrote:
>>
>> Hello Zain,
>>
>>
>>
>> Thanks for providing the additional information. Going back to the
>> original issue:
>>
>> - You are seeing bursty throughput, but the job is keeping up? There is
>> no backpressure?
>>
>> - What is the throughput at the sink?
>>
>> - On the graph screenshot, what is the period and stat (sum/average/etc)?
>>
>>
>>
>> Let me shed some light on the log messages, let's take this example:
>>
>>
>>
>> LogInputStreamReader ... Stage 1 Triggers ...  { stream:
>> 'flink-kafka-tracer', manual: 0, count: 0, size: 0, matches: 0, timed: 3,
>> UserRecords: 6, KinesisRecords: 3 }
>>
>>
>>
>> Flush trigger reason:
>>
>> - manual: the flush was manually triggered
>>
>> - count: flush was triggered by the number of records in the container
>>
>> - size: the flush was triggered by the number of bytes in the container
>>
>> - matches: the predicate was matched
>>
>> - timed: the flush is triggered by elapsed timer
>>
>>
>>
>> Input/Output:
>>
>> - UserRecords: Number of input records KPL flushed (this can be higher
>> than KinesisRecords when aggregation is enabled)
>>
>> - KinesisRecords: Number of records shipped to Kinesis Data Streams
>>
>>
>>
>> Stage 2 triggers tells us the number of API invocations via
>> the PutRecords field.
>>
>>
>>
>> I can see from your logs that the majority of flushes are due to the
>> timer, and it does not look overly bursty. Seems to sit at around 3 records
>> per 15 seconds, or 1 record every 5 seconds. This seems very low, is it
>> expected?
>>
>>
>>
>> Thanks,
>>
>> Danny Cranmer
>>
>>
>>
>> On Mon, May 16, 2022 at 10:57 PM Zain Haider Nemati <
>> zain.haider@retailo.co> wrote:
>>
>> Hey Danny,
>>
>> Thanks for having a look at the issue.
>>
>> I am using a custom flink operator to segregate the data into a
>> consistent format of length 100 which is no more than 1 MB. The
>> configurations I shared were after I was exploring tweaking some of them to
>> see if it improves the throughput.
>>
>>
>>
>> Regarding your queries :
>>
>> - Which Flink version is this? -- > *Version 1.13*
>>
>> - Can you see any errors in the Flink logs?  -->* No, Im attaching flink
>> logs after I have set all the configurations to default*
>>
>> - Do you see any errors/throttles in the Kinesis Data Stream metrics?
>> --> *I was before segregating into smaller chunks not anymore*
>>
>> - How many shards does your stream have? --> *It has 4 shards*
>>
>> - What is your sink operator parallelism? --> *1*
>>
>> - What is the general health of your job graph? --> *This is the only
>> job running at the moment, it isn't unhealthy*
>>
>>   - Are the operators upstream of the sink backpressured? --> *No*
>>
>>   - Are you sure the sink is actually the issue here? --> * I have used
>> the .print() as a sink and Im seeing all the records in real time it chokes
>> when paired with sink*
>>
>>   - Are there any other potential bottlenecks? --> *So data is coming in
>> from source correctly, I have a flatmap transformation enabled which reads
>> and segments it into chunks of <=1MB which is also tested using the
>> .print() sink*
>>
>> - When you say you are trying to achieve "1MB chunks", I assume this is
>> per Kinesis record, not per PutRecords batch? --> *Correct*
>>
>>
>>
>> Attaching a small chunk of the log file from when the job is started [It
>> goes down to 0 records for some periods of time as well, in the log file it
>> shows mostly between 3-6 records]
>>
>>
>>
>> Really appreciate your response on this, since I have not been able to
>> gather much help from other resources online. Would be great if you can let
>> me know what the issue here could be, let me know if you need to know
>> anything else as well !
>>
>>
>>
>> Cheers
>>
>>
>>
>>
>>
>> On Tue, May 17, 2022 at 12:34 AM Danny Cranmer <da...@apache.org>
>> wrote:
>>
>> Hello Zain,
>>
>>
>>
>> When you say "converting them to chunks of <= 1MB " does this mean you
>> are creating these chunks in a custom Flink operator, or you are relying on
>> the connector to do so? If you are generating your own chunks you can
>> potentially disable Aggregation at the sink.
>>
>>
>>
>> Your throughput is incredibly bursty, I have a few questions:
>>
>> - Which Flink version is this?
>>
>> - Can you see any errors in the Flink logs?
>>
>> - Do you see any errors/throttles in the Kinesis Data Stream metrics?
>>
>> - How many shards does your stream have?
>>
>> - What is your sink operator parallelism?
>>
>> - What is the general health of your job graph?
>>
>>   - Are the operators upstream of the sink backpressured?
>>
>>   - Are you sure the sink is actually the issue here?
>>
>>   - Are there any other potential bottlenecks?
>>
>> - When you say you are trying to achieve "1MB chunks", I assume this is
>> per Kinesis record, not per PutRecords batch?
>>
>>
>>
>> Some comments on your configuration:
>>
>>
>>
>> As previously mentioned, if you are generating the chunks you can
>> potentially remove the aggregation config and disable it.
>>
>> - producerConfig.put(“AggregationMaxCount”, “3”);
>> - producerConfig.put(“AggregationMaxSize”, “256”);
>>
>> + producerConfig.put("AggregationEnabled”, “false”);
>>
>>
>>
>> This is very low, and could conflict with your chunk size. These
>> configurations are regarding the PutRecords request, which has a quota of
>> 500 records and 5MiB. You are setting the max size to 100kB, which is less
>> than your largest chunk. I would recommend removing these configurations.
>>
>> - producerConfig.put(“CollectionMaxCount”, “3”);
>> - producerConfig.put(“CollectionMaxSize”, “100000”);
>>
>>
>>
>> This is the default threading model, so can be removed.
>>
>> - producerConfig.put(“ThreadingModel”, “POOLED”);
>>
>>
>>
>> This config should not have too much impact. The default is 100ms, you
>> are increasing to 1s. This could increase your end-to-end latency under low
>> throughput scenarios.
>>
>> - producerConfig.put(“RecordMaxBufferedTime”, “1000");
>>
>>
>>
>> This config controls the sink backpressure and can also impact
>> throughput. Do you see any logs like "Waiting for the queue length to drop
>> below the limit takes unusually long, still not done after <x> attempts"?
>>
>> kinesis.setQueueLimit(1000);
>>
>>
>>
>> Thanks,
>>
>> Danny
>>
>>
>>
>> On Mon, May 16, 2022 at 5:27 PM Alexander Preuß <
>> alexanderpreuss@ververica.com> wrote:
>>
>> Hi Zain,
>>
>>
>>
>> I'm looping in Danny here, he is probably the most knowledgeable when it
>> comes to the Kinesis connector.
>>
>>
>>
>> Best,
>>
>> Alexander
>>
>>
>>
>> On Mon, May 16, 2022 at 12:13 AM Zain Haider Nemati <
>> zain.haider@retailo.co> wrote:
>>
>> Hi,
>>
>> Im fetching data from kafka topics converting them to chunks of <= 1MB
>> and sinking them to a kinesis data stream.
>>
>> The streaming job is functional however I see bursts of data in kinesis
>> stream with intermittent dips where data received is 0. I'm attaching the
>> configuration parameters for kinesis sink. What could be the cause of this
>> issue?
>>
>> The data is being fed into datastream by a kafka topic which is being fed
>> in by a mongodb and has about 60 million records which are loaded fully.
>>
>> I am trying to configure parameters in such a way that the 1MB per data
>> payload limit of kinesis is not breached. Would appreciate help on this !
>>
>>
>>
>> producerConfig.put(AWSConfigConstants.AWS_REGION, “xxx”);
>>
>> producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, “xxx");
>>
>> producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, “xxx”);
>>                     producerConfig.put(“AggregationMaxCount”, “3”);
>>                     producerConfig.put(“AggregationMaxSize”, “256”);
>>                     producerConfig.put(“CollectionMaxCount”, “3”);
>>                     producerConfig.put(“CollectionMaxSize”, “100000”);
>>                     producerConfig.put(“AggregationEnabled”, true);
>>                     producerConfig.put(“RateLimit”, “50");
>>                     producerConfig.put(“RecordMaxBufferedTime”, “1000");
>>                     producerConfig.put(“ThreadingModel”, “POOLED”);
>>                     FlinkKinesisProducer<String> kinesis = new
>> FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);
>>                         kinesis.setFailOnError(false);
>>                         kinesis.setDefaultStream(“xxx”);
>>                         kinesis.setDefaultPartition(“0");
>>                         kinesis.setQueueLimit(1000);
>>
>>
>>
>> *Data in Kinesis :*
>>
>>
>>
>>
>> --
>>
>> *Alexander Preuß* | Engineer - Data Intensive Systems
>>
>> alexanderpreuss@ververica.com
>>
>> [image: Image removed by sender.] <https://www.ververica.com/>
>>
>>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>>
>> Ververica GmbH
>>
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>
>> Managing Directors: Alexander Walden, Karl Anton Wehner, Yip Park Tung
>> Jason, Jinwei (Kevin) Zhang
>>
>>
>>
>>

Re: Kinesis Sink - Data being received with intermittent breaks

Posted by Zain Haider Nemati <za...@retailo.co>.
Hi Danny,
I looked into it in a bit more thorough detail, the bottleneck seems to be
the transform function which is at 100% and causing back pressuring. Im
looking into that.
Thanks for your help, much appreciated !

On Fri, May 20, 2022 at 1:24 AM Ber, Jeremy <jd...@amazon.com> wrote:

> Hi Zain—
>
>
>
> Are you seeing any data loss present within the Flink Dashboard subtasks
> of each task? On the bottom of your dashboard you should see data going
> from each blue box to the next. Is this a comprehensive set of data?
> Meaning do you see 80M from the source -> first operator -> second operator
> -> sink?
>
>
>
> Secondly, it may be easier to troubleshoot this by removing a few
> variables. Would you be able to remove the operator which segregates your
> data into 100 length records and simply forward that data to the next
> operator? Simultaneously, could you leave the Kinesis Producer
> configuration settings (apart from queue limit) at their defaults? This
> will give a good baseline from which to improve upon.
>
>
>
> Jeremy
>
>
>
> *From: *Zain Haider Nemati <za...@retailo.co>
> *Date: *Wednesday, May 18, 2022 at 6:15 AM
> *To: *Danny Cranmer <da...@apache.org>
> *Cc: *Alexander Preuß <al...@ververica.com>, user <
> user@flink.apache.org>
> *Subject: *RE: [EXTERNAL]Kinesis Sink - Data being received with
> intermittent breaks
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> Hey Danny,
>
> Thanks for getting back to me.
>
> - You are seeing bursty throughput, but the job is keeping up? There is no
> backpressure? --> Correct I'm not seeing any backpressure in any of the
> metrics
>
> - What is the throughput at the sink? --> num of records out -- 1100 per
> 10 seconds
>
> - On the graph screenshot, what is the period and stat (sum/average/etc)?
> -->It is incoming data (MB/s) each second
>
>
>
> So let me explain this in totality, the number of records residing in the
> source are about 80 million and the number of records i see in the kinesis
> data stream after it has consumed the data from source is about 20 million
> so im seeing alot of data loss and I think this potentially has to do with
> the intermediate dips im seeing in the records coming in the data stream.
>
>
>
> What are the configurations you guys generally suggest for data of this
> range and sinking to a kinesis data stream?
>
>
>
> On Wed, May 18, 2022 at 2:00 AM Danny Cranmer <da...@apache.org>
> wrote:
>
> Hello Zain,
>
>
>
> Thanks for providing the additional information. Going back to the
> original issue:
>
> - You are seeing bursty throughput, but the job is keeping up? There is no
> backpressure?
>
> - What is the throughput at the sink?
>
> - On the graph screenshot, what is the period and stat (sum/average/etc)?
>
>
>
> Let me shed some light on the log messages, let's take this example:
>
>
>
> LogInputStreamReader ... Stage 1 Triggers ...  { stream:
> 'flink-kafka-tracer', manual: 0, count: 0, size: 0, matches: 0, timed: 3,
> UserRecords: 6, KinesisRecords: 3 }
>
>
>
> Flush trigger reason:
>
> - manual: the flush was manually triggered
>
> - count: flush was triggered by the number of records in the container
>
> - size: the flush was triggered by the number of bytes in the container
>
> - matches: the predicate was matched
>
> - timed: the flush is triggered by elapsed timer
>
>
>
> Input/Output:
>
> - UserRecords: Number of input records KPL flushed (this can be higher
> than KinesisRecords when aggregation is enabled)
>
> - KinesisRecords: Number of records shipped to Kinesis Data Streams
>
>
>
> Stage 2 triggers tells us the number of API invocations via the PutRecords
> field.
>
>
>
> I can see from your logs that the majority of flushes are due to the
> timer, and it does not look overly bursty. Seems to sit at around 3 records
> per 15 seconds, or 1 record every 5 seconds. This seems very low, is it
> expected?
>
>
>
> Thanks,
>
> Danny Cranmer
>
>
>
> On Mon, May 16, 2022 at 10:57 PM Zain Haider Nemati <
> zain.haider@retailo.co> wrote:
>
> Hey Danny,
>
> Thanks for having a look at the issue.
>
> I am using a custom flink operator to segregate the data into a consistent
> format of length 100 which is no more than 1 MB. The configurations I
> shared were after I was exploring tweaking some of them to see if it
> improves the throughput.
>
>
>
> Regarding your queries :
>
> - Which Flink version is this? -- > *Version 1.13*
>
> - Can you see any errors in the Flink logs?  -->* No, Im attaching flink
> logs after I have set all the configurations to default*
>
> - Do you see any errors/throttles in the Kinesis Data Stream metrics?  --> *I
> was before segregating into smaller chunks not anymore*
>
> - How many shards does your stream have? --> *It has 4 shards*
>
> - What is your sink operator parallelism? --> *1*
>
> - What is the general health of your job graph? --> *This is the only job
> running at the moment, it isn't unhealthy*
>
>   - Are the operators upstream of the sink backpressured? --> *No*
>
>   - Are you sure the sink is actually the issue here? --> * I have used
> the .print() as a sink and Im seeing all the records in real time it chokes
> when paired with sink*
>
>   - Are there any other potential bottlenecks? --> *So data is coming in
> from source correctly, I have a flatmap transformation enabled which reads
> and segments it into chunks of <=1MB which is also tested using the
> .print() sink*
>
> - When you say you are trying to achieve "1MB chunks", I assume this is
> per Kinesis record, not per PutRecords batch? --> *Correct*
>
>
>
> Attaching a small chunk of the log file from when the job is started [It
> goes down to 0 records for some periods of time as well, in the log file it
> shows mostly between 3-6 records]
>
>
>
> Really appreciate your response on this, since I have not been able to
> gather much help from other resources online. Would be great if you can let
> me know what the issue here could be, let me know if you need to know
> anything else as well !
>
>
>
> Cheers
>
>
>
>
>
> On Tue, May 17, 2022 at 12:34 AM Danny Cranmer <da...@apache.org>
> wrote:
>
> Hello Zain,
>
>
>
> When you say "converting them to chunks of <= 1MB " does this mean you are
> creating these chunks in a custom Flink operator, or you are relying on
> the connector to do so? If you are generating your own chunks you can
> potentially disable Aggregation at the sink.
>
>
>
> Your throughput is incredibly bursty, I have a few questions:
>
> - Which Flink version is this?
>
> - Can you see any errors in the Flink logs?
>
> - Do you see any errors/throttles in the Kinesis Data Stream metrics?
>
> - How many shards does your stream have?
>
> - What is your sink operator parallelism?
>
> - What is the general health of your job graph?
>
>   - Are the operators upstream of the sink backpressured?
>
>   - Are you sure the sink is actually the issue here?
>
>   - Are there any other potential bottlenecks?
>
> - When you say you are trying to achieve "1MB chunks", I assume this is
> per Kinesis record, not per PutRecords batch?
>
>
>
> Some comments on your configuration:
>
>
>
> As previously mentioned, if you are generating the chunks you can
> potentially remove the aggregation config and disable it.
>
> - producerConfig.put(“AggregationMaxCount”, “3”);
> - producerConfig.put(“AggregationMaxSize”, “256”);
>
> + producerConfig.put("AggregationEnabled”, “false”);
>
>
>
> This is very low, and could conflict with your chunk size. These
> configurations are regarding the PutRecords request, which has a quota of
> 500 records and 5MiB. You are setting the max size to 100kB, which is less
> than your largest chunk. I would recommend removing these configurations.
>
> - producerConfig.put(“CollectionMaxCount”, “3”);
> - producerConfig.put(“CollectionMaxSize”, “100000”);
>
>
>
> This is the default threading model, so can be removed.
>
> - producerConfig.put(“ThreadingModel”, “POOLED”);
>
>
>
> This config should not have too much impact. The default is 100ms, you are
> increasing to 1s. This could increase your end-to-end latency under low
> throughput scenarios.
>
> - producerConfig.put(“RecordMaxBufferedTime”, “1000");
>
>
>
> This config controls the sink backpressure and can also impact throughput.
> Do you see any logs like "Waiting for the queue length to drop below the
> limit takes unusually long, still not done after <x> attempts"?
>
> kinesis.setQueueLimit(1000);
>
>
>
> Thanks,
>
> Danny
>
>
>
> On Mon, May 16, 2022 at 5:27 PM Alexander Preuß <
> alexanderpreuss@ververica.com> wrote:
>
> Hi Zain,
>
>
>
> I'm looping in Danny here, he is probably the most knowledgeable when it
> comes to the Kinesis connector.
>
>
>
> Best,
>
> Alexander
>
>
>
> On Mon, May 16, 2022 at 12:13 AM Zain Haider Nemati <
> zain.haider@retailo.co> wrote:
>
> Hi,
>
> Im fetching data from kafka topics converting them to chunks of <= 1MB and
> sinking them to a kinesis data stream.
>
> The streaming job is functional however I see bursts of data in kinesis
> stream with intermittent dips where data received is 0. I'm attaching the
> configuration parameters for kinesis sink. What could be the cause of this
> issue?
>
> The data is being fed into datastream by a kafka topic which is being fed
> in by a mongodb and has about 60 million records which are loaded fully.
>
> I am trying to configure parameters in such a way that the 1MB per data
> payload limit of kinesis is not breached. Would appreciate help on this !
>
>
>
> producerConfig.put(AWSConfigConstants.AWS_REGION, “xxx”);
>
> producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, “xxx");
>
> producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, “xxx”);
>                     producerConfig.put(“AggregationMaxCount”, “3”);
>                     producerConfig.put(“AggregationMaxSize”, “256”);
>                     producerConfig.put(“CollectionMaxCount”, “3”);
>                     producerConfig.put(“CollectionMaxSize”, “100000”);
>                     producerConfig.put(“AggregationEnabled”, true);
>                     producerConfig.put(“RateLimit”, “50");
>                     producerConfig.put(“RecordMaxBufferedTime”, “1000");
>                     producerConfig.put(“ThreadingModel”, “POOLED”);
>                     FlinkKinesisProducer<String> kinesis = new
> FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);
>                         kinesis.setFailOnError(false);
>                         kinesis.setDefaultStream(“xxx”);
>                         kinesis.setDefaultPartition(“0");
>                         kinesis.setQueueLimit(1000);
>
>
>
> *Data in Kinesis :*
>
>
>
>
> --
>
> *Alexander Preuß* | Engineer - Data Intensive Systems
>
> alexanderpreuss@ververica.com
>
> [image: Image removed by sender.] <https://www.ververica.com/>
>
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
>
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>
> Managing Directors: Alexander Walden, Karl Anton Wehner, Yip Park Tung
> Jason, Jinwei (Kevin) Zhang
>
>
>
>

Re: Kinesis Sink - Data being received with intermittent breaks

Posted by "Ber, Jeremy" <jd...@amazon.com>.
Hi Zain—

Are you seeing any data loss present within the Flink Dashboard subtasks of each task? On the bottom of your dashboard you should see data going from each blue box to the next. Is this a comprehensive set of data? Meaning do you see 80M from the source -> first operator -> second operator -> sink?

Secondly, it may be easier to troubleshoot this by removing a few variables. Would you be able to remove the operator which segregates your data into 100 length records and simply forward that data to the next operator? Simultaneously, could you leave the Kinesis Producer configuration settings (apart from queue limit) at their defaults? This will give a good baseline from which to improve upon.

Jeremy

From: Zain Haider Nemati <za...@retailo.co>
Date: Wednesday, May 18, 2022 at 6:15 AM
To: Danny Cranmer <da...@apache.org>
Cc: Alexander Preuß <al...@ververica.com>, user <us...@flink.apache.org>
Subject: RE: [EXTERNAL]Kinesis Sink - Data being received with intermittent breaks


CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.


Hey Danny,
Thanks for getting back to me.
- You are seeing bursty throughput, but the job is keeping up? There is no backpressure? --> Correct I'm not seeing any backpressure in any of the metrics
- What is the throughput at the sink? --> num of records out -- 1100 per 10 seconds
- On the graph screenshot, what is the period and stat (sum/average/etc)? -->It is incoming data (MB/s) each second

So let me explain this in totality, the number of records residing in the source are about 80 million and the number of records i see in the kinesis data stream after it has consumed the data from source is about 20 million so im seeing alot of data loss and I think this potentially has to do with the intermediate dips im seeing in the records coming in the data stream.

What are the configurations you guys generally suggest for data of this range and sinking to a kinesis data stream?

On Wed, May 18, 2022 at 2:00 AM Danny Cranmer <da...@apache.org>> wrote:
Hello Zain,

Thanks for providing the additional information. Going back to the original issue:
- You are seeing bursty throughput, but the job is keeping up? There is no backpressure?
- What is the throughput at the sink?
- On the graph screenshot, what is the period and stat (sum/average/etc)?

Let me shed some light on the log messages, let's take this example:

LogInputStreamReader ... Stage 1 Triggers ...  { stream: 'flink-kafka-tracer', manual: 0, count: 0, size: 0, matches: 0, timed: 3, UserRecords: 6, KinesisRecords: 3 }

Flush trigger reason:
- manual: the flush was manually triggered
- count: flush was triggered by the number of records in the container
- size: the flush was triggered by the number of bytes in the container
- matches: the predicate was matched
- timed: the flush is triggered by elapsed timer

Input/Output:
- UserRecords: Number of input records KPL flushed (this can be higher than KinesisRecords when aggregation is enabled)
- KinesisRecords: Number of records shipped to Kinesis Data Streams

Stage 2 triggers tells us the number of API invocations via the PutRecords field.

I can see from your logs that the majority of flushes are due to the timer, and it does not look overly bursty. Seems to sit at around 3 records per 15 seconds, or 1 record every 5 seconds. This seems very low, is it expected?

Thanks,
Danny Cranmer

On Mon, May 16, 2022 at 10:57 PM Zain Haider Nemati <za...@retailo.co>> wrote:
Hey Danny,
Thanks for having a look at the issue.
I am using a custom flink operator to segregate the data into a consistent format of length 100 which is no more than 1 MB. The configurations I shared were after I was exploring tweaking some of them to see if it improves the throughput.

Regarding your queries :
- Which Flink version is this? -- > Version 1.13
- Can you see any errors in the Flink logs?  --> No, Im attaching flink logs after I have set all the configurations to default
- Do you see any errors/throttles in the Kinesis Data Stream metrics?  --> I was before segregating into smaller chunks not anymore
- How many shards does your stream have? --> It has 4 shards
- What is your sink operator parallelism? --> 1
- What is the general health of your job graph? --> This is the only job running at the moment, it isn't unhealthy
  - Are the operators upstream of the sink backpressured? --> No
  - Are you sure the sink is actually the issue here? --> I have used the .print() as a sink and Im seeing all the records in real time it chokes when paired with sink
  - Are there any other potential bottlenecks? --> So data is coming in from source correctly, I have a flatmap transformation enabled which reads and segments it into chunks of <=1MB which is also tested using the .print() sink
- When you say you are trying to achieve "1MB chunks", I assume this is per Kinesis record, not per PutRecords batch? --> Correct

Attaching a small chunk of the log file from when the job is started [It goes down to 0 records for some periods of time as well, in the log file it shows mostly between 3-6 records]

Really appreciate your response on this, since I have not been able to gather much help from other resources online. Would be great if you can let me know what the issue here could be, let me know if you need to know anything else as well !

Cheers


On Tue, May 17, 2022 at 12:34 AM Danny Cranmer <da...@apache.org>> wrote:
Hello Zain,

When you say "converting them to chunks of <= 1MB " does this mean you are creating these chunks in a custom Flink operator, or you are relying on the connector to do so? If you are generating your own chunks you can potentially disable Aggregation at the sink.

Your throughput is incredibly bursty, I have a few questions:
- Which Flink version is this?
- Can you see any errors in the Flink logs?
- Do you see any errors/throttles in the Kinesis Data Stream metrics?
- How many shards does your stream have?
- What is your sink operator parallelism?
- What is the general health of your job graph?
  - Are the operators upstream of the sink backpressured?
  - Are you sure the sink is actually the issue here?
  - Are there any other potential bottlenecks?
- When you say you are trying to achieve "1MB chunks", I assume this is per Kinesis record, not per PutRecords batch?

Some comments on your configuration:

As previously mentioned, if you are generating the chunks you can potentially remove the aggregation config and disable it.
- producerConfig.put(“AggregationMaxCount”, “3”);
- producerConfig.put(“AggregationMaxSize”, “256”);
+ producerConfig.put("AggregationEnabled”, “false”);

This is very low, and could conflict with your chunk size. These configurations are regarding the PutRecords request, which has a quota of 500 records and 5MiB. You are setting the max size to 100kB, which is less than your largest chunk. I would recommend removing these configurations.
- producerConfig.put(“CollectionMaxCount”, “3”);
- producerConfig.put(“CollectionMaxSize”, “100000”);

This is the default threading model, so can be removed.
- producerConfig.put(“ThreadingModel”, “POOLED”);

This config should not have too much impact. The default is 100ms, you are increasing to 1s. This could increase your end-to-end latency under low throughput scenarios.
- producerConfig.put(“RecordMaxBufferedTime”, “1000");

This config controls the sink backpressure and can also impact throughput. Do you see any logs like "Waiting for the queue length to drop below the limit takes unusually long, still not done after <x> attempts"?
kinesis.setQueueLimit(1000);

Thanks,
Danny

On Mon, May 16, 2022 at 5:27 PM Alexander Preuß <al...@ververica.com>> wrote:
Hi Zain,

I'm looping in Danny here, he is probably the most knowledgeable when it comes to the Kinesis connector.

Best,
Alexander

On Mon, May 16, 2022 at 12:13 AM Zain Haider Nemati <za...@retailo.co>> wrote:
Hi,
Im fetching data from kafka topics converting them to chunks of <= 1MB and sinking them to a kinesis data stream.
The streaming job is functional however I see bursts of data in kinesis stream with intermittent dips where data received is 0. I'm attaching the configuration parameters for kinesis sink. What could be the cause of this issue?
The data is being fed into datastream by a kafka topic which is being fed in by a mongodb and has about 60 million records which are loaded fully.
I am trying to configure parameters in such a way that the 1MB per data payload limit of kinesis is not breached. Would appreciate help on this !

producerConfig.put(AWSConfigConstants.AWS_REGION, “xxx”);
                    producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, “xxx");
                    producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, “xxx”);
                    producerConfig.put(“AggregationMaxCount”, “3”);
                    producerConfig.put(“AggregationMaxSize”, “256”);
                    producerConfig.put(“CollectionMaxCount”, “3”);
                    producerConfig.put(“CollectionMaxSize”, “100000”);
                    producerConfig.put(“AggregationEnabled”, true);
                    producerConfig.put(“RateLimit”, “50");
                    producerConfig.put(“RecordMaxBufferedTime”, “1000");
                    producerConfig.put(“ThreadingModel”, “POOLED”);
                    FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);
                        kinesis.setFailOnError(false);
                        kinesis.setDefaultStream(“xxx”);
                        kinesis.setDefaultPartition(“0");
                        kinesis.setQueueLimit(1000);

Data in Kinesis :
[cid:image001.png@01D86B94.90EC5990]


--

Alexander Preuß | Engineer - Data Intensive Systems

alexanderpreuss@ververica.com<ma...@ververica.com>

[Image removed by sender.]<https://www.ververica.com/>



Follow us @VervericaData

--

Join Flink Forward<https://flink-forward.org/> - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH

Registered at Amtsgericht Charlottenburg: HRB 158244 B

Managing Directors: Alexander Walden, Karl Anton Wehner, Yip Park Tung Jason, Jinwei (Kevin) Zhang



Re: Kinesis Sink - Data being received with intermittent breaks

Posted by Zain Haider Nemati <za...@retailo.co>.
Hey Danny,
Thanks for getting back to me.
- You are seeing bursty throughput, but the job is keeping up? There is no
backpressure? --> Correct I'm not seeing any backpressure in any of the
metrics
- What is the throughput at the sink? --> num of records out -- 1100 per 10
seconds
- On the graph screenshot, what is the period and stat (sum/average/etc)?
-->It is incoming data (MB/s) each second

So let me explain this in totality, the number of records residing in the
source are about 80 million and the number of records i see in the kinesis
data stream after it has consumed the data from source is about 20 million
so im seeing alot of data loss and I think this potentially has to do with
the intermediate dips im seeing in the records coming in the data stream.

What are the configurations you guys generally suggest for data of this
range and sinking to a kinesis data stream?

On Wed, May 18, 2022 at 2:00 AM Danny Cranmer <da...@apache.org>
wrote:

> Hello Zain,
>
> Thanks for providing the additional information. Going back to the
> original issue:
> - You are seeing bursty throughput, but the job is keeping up? There is no
> backpressure?
> - What is the throughput at the sink?
> - On the graph screenshot, what is the period and stat (sum/average/etc)?
>
> Let me shed some light on the log messages, let's take this example:
>
> LogInputStreamReader ... Stage 1 Triggers ...  { stream:
> 'flink-kafka-tracer', manual: 0, count: 0, size: 0, matches: 0, timed: 3,
> UserRecords: 6, KinesisRecords: 3 }
>
> Flush trigger reason:
> - manual: the flush was manually triggered
> - count: flush was triggered by the number of records in the container
> - size: the flush was triggered by the number of bytes in the container
> - matches: the predicate was matched
> - timed: the flush is triggered by elapsed timer
>
> Input/Output:
> - UserRecords: Number of input records KPL flushed (this can be higher
> than KinesisRecords when aggregation is enabled)
> - KinesisRecords: Number of records shipped to Kinesis Data Streams
>
> Stage 2 triggers tells us the number of API invocations via the PutRecords
> field.
>
> I can see from your logs that the majority of flushes are due to the
> timer, and it does not look overly bursty. Seems to sit at around 3 records
> per 15 seconds, or 1 record every 5 seconds. This seems very low, is it
> expected?
>
> Thanks,
> Danny Cranmer
>
> On Mon, May 16, 2022 at 10:57 PM Zain Haider Nemati <
> zain.haider@retailo.co> wrote:
>
>> Hey Danny,
>> Thanks for having a look at the issue.
>> I am using a custom flink operator to segregate the data into a
>> consistent format of length 100 which is no more than 1 MB. The
>> configurations I shared were after I was exploring tweaking some of them to
>> see if it improves the throughput.
>>
>> Regarding your queries :
>> - Which Flink version is this? -- > *Version 1.13*
>> - Can you see any errors in the Flink logs?  -->* No, Im attaching flink
>> logs after I have set all the configurations to default*
>> - Do you see any errors/throttles in the Kinesis Data Stream metrics?
>> --> *I was before segregating into smaller chunks not anymore*
>> - How many shards does your stream have? --> *It has 4 shards*
>> - What is your sink operator parallelism? --> *1*
>> - What is the general health of your job graph? --> *This is the only
>> job running at the moment, it isn't unhealthy*
>>   - Are the operators upstream of the sink backpressured? --> *No*
>>   - Are you sure the sink is actually the issue here? --> *I have used
>> the .print() as a sink and Im seeing all the records in real time it chokes
>> when paired with sink*
>>   - Are there any other potential bottlenecks? --> *So data is coming in
>> from source correctly, I have a flatmap transformation enabled which reads
>> and segments it into chunks of <=1MB which is also tested using the
>> .print() sink*
>> - When you say you are trying to achieve "1MB chunks", I assume this is
>> per Kinesis record, not per PutRecords batch? --> *Correct*
>>
>> Attaching a small chunk of the log file from when the job is started [It
>> goes down to 0 records for some periods of time as well, in the log file it
>> shows mostly between 3-6 records]
>>
>> Really appreciate your response on this, since I have not been able to
>> gather much help from other resources online. Would be great if you can let
>> me know what the issue here could be, let me know if you need to know
>> anything else as well !
>>
>> Cheers
>>
>>
>> On Tue, May 17, 2022 at 12:34 AM Danny Cranmer <da...@apache.org>
>> wrote:
>>
>>> Hello Zain,
>>>
>>> When you say "converting them to chunks of <= 1MB " does this mean you
>>> are creating these chunks in a custom Flink operator, or you are relying on
>>> the connector to do so? If you are generating your own chunks you can
>>> potentially disable Aggregation at the sink.
>>>
>>> Your throughput is incredibly bursty, I have a few questions:
>>> - Which Flink version is this?
>>> - Can you see any errors in the Flink logs?
>>> - Do you see any errors/throttles in the Kinesis Data Stream metrics?
>>> - How many shards does your stream have?
>>> - What is your sink operator parallelism?
>>> - What is the general health of your job graph?
>>>   - Are the operators upstream of the sink backpressured?
>>>   - Are you sure the sink is actually the issue here?
>>>   - Are there any other potential bottlenecks?
>>> - When you say you are trying to achieve "1MB chunks", I assume this is
>>> per Kinesis record, not per PutRecords batch?
>>>
>>> Some comments on your configuration:
>>>
>>> As previously mentioned, if you are generating the chunks you can
>>> potentially remove the aggregation config and disable it.
>>> - producerConfig.put(“AggregationMaxCount”, “3”);
>>> - producerConfig.put(“AggregationMaxSize”, “256”);
>>> + producerConfig.put("AggregationEnabled”, “false”);
>>>
>>> This is very low, and could conflict with your chunk size. These
>>> configurations are regarding the PutRecords request, which has a quota of
>>> 500 records and 5MiB. You are setting the max size to 100kB, which is less
>>> than your largest chunk. I would recommend removing these configurations.
>>> - producerConfig.put(“CollectionMaxCount”, “3”);
>>> - producerConfig.put(“CollectionMaxSize”, “100000”);
>>>
>>> This is the default threading model, so can be removed.
>>> - producerConfig.put(“ThreadingModel”, “POOLED”);
>>>
>>> This config should not have too much impact. The default is 100ms, you
>>> are increasing to 1s. This could increase your end-to-end latency under low
>>> throughput scenarios.
>>> - producerConfig.put(“RecordMaxBufferedTime”, “1000");
>>>
>>> This config controls the sink backpressure and can also impact
>>> throughput. Do you see any logs like "Waiting for the queue length to drop
>>> below the limit takes unusually long, still not done after <x> attempts"?
>>> kinesis.setQueueLimit(1000);
>>>
>>> Thanks,
>>> Danny
>>>
>>> On Mon, May 16, 2022 at 5:27 PM Alexander Preuß <
>>> alexanderpreuss@ververica.com> wrote:
>>>
>>>> Hi Zain,
>>>>
>>>> I'm looping in Danny here, he is probably the most knowledgeable when
>>>> it comes to the Kinesis connector.
>>>>
>>>> Best,
>>>> Alexander
>>>>
>>>> On Mon, May 16, 2022 at 12:13 AM Zain Haider Nemati <
>>>> zain.haider@retailo.co> wrote:
>>>>
>>>>> Hi,
>>>>> Im fetching data from kafka topics converting them to chunks of <= 1MB
>>>>> and sinking them to a kinesis data stream.
>>>>> The streaming job is functional however I see bursts of data in
>>>>> kinesis stream with intermittent dips where data received is 0. I'm
>>>>> attaching the configuration parameters for kinesis sink. What could be the
>>>>> cause of this issue?
>>>>> The data is being fed into datastream by a kafka topic which is being
>>>>> fed in by a mongodb and has about 60 million records which are loaded fully.
>>>>> I am trying to configure parameters in such a way that the 1MB per
>>>>> data payload limit of kinesis is not breached. Would appreciate help on
>>>>> this !
>>>>>
>>>>> producerConfig.put(AWSConfigConstants.AWS_REGION, “xxx”);
>>>>>
>>>>> producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, “xxx");
>>>>>
>>>>> producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, “xxx”);
>>>>>                     producerConfig.put(“AggregationMaxCount”, “3”);
>>>>>                     producerConfig.put(“AggregationMaxSize”, “256”);
>>>>>                     producerConfig.put(“CollectionMaxCount”, “3”);
>>>>>                     producerConfig.put(“CollectionMaxSize”, “100000”);
>>>>>                     producerConfig.put(“AggregationEnabled”, true);
>>>>>                     producerConfig.put(“RateLimit”, “50");
>>>>>                     producerConfig.put(“RecordMaxBufferedTime”,
>>>>> “1000");
>>>>>                     producerConfig.put(“ThreadingModel”, “POOLED”);
>>>>>                     FlinkKinesisProducer<String> kinesis = new
>>>>> FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);
>>>>>                         kinesis.setFailOnError(false);
>>>>>                         kinesis.setDefaultStream(“xxx”);
>>>>>                         kinesis.setDefaultPartition(“0");
>>>>>                         kinesis.setQueueLimit(1000);
>>>>>
>>>>> *Data in Kinesis :*
>>>>> [image: image.png]
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Alexander Preuß | Engineer - Data Intensive Systems
>>>>
>>>> alexanderpreuss@ververica.com
>>>>
>>>> <https://www.ververica.com/>
>>>>
>>>>
>>>> Follow us @VervericaData
>>>>
>>>> --
>>>>
>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>> Conference
>>>>
>>>> Stream Processing | Event Driven | Real Time
>>>>
>>>> --
>>>>
>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>
>>>> --
>>>>
>>>> Ververica GmbH
>>>>
>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>
>>>> Managing Directors: Alexander Walden, Karl Anton Wehner, Yip Park Tung
>>>> Jason, Jinwei (Kevin) Zhang
>>>>
>>>>

Re: Kinesis Sink - Data being received with intermittent breaks

Posted by Danny Cranmer <da...@apache.org>.
Hello Zain,

Thanks for providing the additional information. Going back to the original
issue:
- You are seeing bursty throughput, but the job is keeping up? There is no
backpressure?
- What is the throughput at the sink?
- On the graph screenshot, what is the period and stat (sum/average/etc)?

Let me shed some light on the log messages, let's take this example:

LogInputStreamReader ... Stage 1 Triggers ...  { stream:
'flink-kafka-tracer', manual: 0, count: 0, size: 0, matches: 0, timed: 3,
UserRecords: 6, KinesisRecords: 3 }

Flush trigger reason:
- manual: the flush was manually triggered
- count: flush was triggered by the number of records in the container
- size: the flush was triggered by the number of bytes in the container
- matches: the predicate was matched
- timed: the flush is triggered by elapsed timer

Input/Output:
- UserRecords: Number of input records KPL flushed (this can be higher than
KinesisRecords when aggregation is enabled)
- KinesisRecords: Number of records shipped to Kinesis Data Streams

Stage 2 triggers tells us the number of API invocations via the PutRecords
field.

I can see from your logs that the majority of flushes are due to the timer,
and it does not look overly bursty. Seems to sit at around 3 records per 15
seconds, or 1 record every 5 seconds. This seems very low, is it expected?

Thanks,
Danny Cranmer

On Mon, May 16, 2022 at 10:57 PM Zain Haider Nemati <za...@retailo.co>
wrote:

> Hey Danny,
> Thanks for having a look at the issue.
> I am using a custom flink operator to segregate the data into a consistent
> format of length 100 which is no more than 1 MB. The configurations I
> shared were after I was exploring tweaking some of them to see if it
> improves the throughput.
>
> Regarding your queries :
> - Which Flink version is this? -- > *Version 1.13*
> - Can you see any errors in the Flink logs?  -->* No, Im attaching flink
> logs after I have set all the configurations to default*
> - Do you see any errors/throttles in the Kinesis Data Stream metrics?  --> *I
> was before segregating into smaller chunks not anymore*
> - How many shards does your stream have? --> *It has 4 shards*
> - What is your sink operator parallelism? --> *1*
> - What is the general health of your job graph? --> *This is the only job
> running at the moment, it isn't unhealthy*
>   - Are the operators upstream of the sink backpressured? --> *No*
>   - Are you sure the sink is actually the issue here? --> *I have used
> the .print() as a sink and Im seeing all the records in real time it chokes
> when paired with sink*
>   - Are there any other potential bottlenecks? --> *So data is coming in
> from source correctly, I have a flatmap transformation enabled which reads
> and segments it into chunks of <=1MB which is also tested using the
> .print() sink*
> - When you say you are trying to achieve "1MB chunks", I assume this is
> per Kinesis record, not per PutRecords batch? --> *Correct*
>
> Attaching a small chunk of the log file from when the job is started [It
> goes down to 0 records for some periods of time as well, in the log file it
> shows mostly between 3-6 records]
>
> Really appreciate your response on this, since I have not been able to
> gather much help from other resources online. Would be great if you can let
> me know what the issue here could be, let me know if you need to know
> anything else as well !
>
> Cheers
>
>
> On Tue, May 17, 2022 at 12:34 AM Danny Cranmer <da...@apache.org>
> wrote:
>
>> Hello Zain,
>>
>> When you say "converting them to chunks of <= 1MB " does this mean you
>> are creating these chunks in a custom Flink operator, or you are relying on
>> the connector to do so? If you are generating your own chunks you can
>> potentially disable Aggregation at the sink.
>>
>> Your throughput is incredibly bursty, I have a few questions:
>> - Which Flink version is this?
>> - Can you see any errors in the Flink logs?
>> - Do you see any errors/throttles in the Kinesis Data Stream metrics?
>> - How many shards does your stream have?
>> - What is your sink operator parallelism?
>> - What is the general health of your job graph?
>>   - Are the operators upstream of the sink backpressured?
>>   - Are you sure the sink is actually the issue here?
>>   - Are there any other potential bottlenecks?
>> - When you say you are trying to achieve "1MB chunks", I assume this is
>> per Kinesis record, not per PutRecords batch?
>>
>> Some comments on your configuration:
>>
>> As previously mentioned, if you are generating the chunks you can
>> potentially remove the aggregation config and disable it.
>> - producerConfig.put(“AggregationMaxCount”, “3”);
>> - producerConfig.put(“AggregationMaxSize”, “256”);
>> + producerConfig.put("AggregationEnabled”, “false”);
>>
>> This is very low, and could conflict with your chunk size. These
>> configurations are regarding the PutRecords request, which has a quota of
>> 500 records and 5MiB. You are setting the max size to 100kB, which is less
>> than your largest chunk. I would recommend removing these configurations.
>> - producerConfig.put(“CollectionMaxCount”, “3”);
>> - producerConfig.put(“CollectionMaxSize”, “100000”);
>>
>> This is the default threading model, so can be removed.
>> - producerConfig.put(“ThreadingModel”, “POOLED”);
>>
>> This config should not have too much impact. The default is 100ms, you
>> are increasing to 1s. This could increase your end-to-end latency under low
>> throughput scenarios.
>> - producerConfig.put(“RecordMaxBufferedTime”, “1000");
>>
>> This config controls the sink backpressure and can also impact
>> throughput. Do you see any logs like "Waiting for the queue length to drop
>> below the limit takes unusually long, still not done after <x> attempts"?
>> kinesis.setQueueLimit(1000);
>>
>> Thanks,
>> Danny
>>
>> On Mon, May 16, 2022 at 5:27 PM Alexander Preuß <
>> alexanderpreuss@ververica.com> wrote:
>>
>>> Hi Zain,
>>>
>>> I'm looping in Danny here, he is probably the most knowledgeable when it
>>> comes to the Kinesis connector.
>>>
>>> Best,
>>> Alexander
>>>
>>> On Mon, May 16, 2022 at 12:13 AM Zain Haider Nemati <
>>> zain.haider@retailo.co> wrote:
>>>
>>>> Hi,
>>>> Im fetching data from kafka topics converting them to chunks of <= 1MB
>>>> and sinking them to a kinesis data stream.
>>>> The streaming job is functional however I see bursts of data in kinesis
>>>> stream with intermittent dips where data received is 0. I'm attaching the
>>>> configuration parameters for kinesis sink. What could be the cause of this
>>>> issue?
>>>> The data is being fed into datastream by a kafka topic which is being
>>>> fed in by a mongodb and has about 60 million records which are loaded fully.
>>>> I am trying to configure parameters in such a way that the 1MB per data
>>>> payload limit of kinesis is not breached. Would appreciate help on this !
>>>>
>>>> producerConfig.put(AWSConfigConstants.AWS_REGION, “xxx”);
>>>>
>>>> producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, “xxx");
>>>>
>>>> producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, “xxx”);
>>>>                     producerConfig.put(“AggregationMaxCount”, “3”);
>>>>                     producerConfig.put(“AggregationMaxSize”, “256”);
>>>>                     producerConfig.put(“CollectionMaxCount”, “3”);
>>>>                     producerConfig.put(“CollectionMaxSize”, “100000”);
>>>>                     producerConfig.put(“AggregationEnabled”, true);
>>>>                     producerConfig.put(“RateLimit”, “50");
>>>>                     producerConfig.put(“RecordMaxBufferedTime”, “1000");
>>>>                     producerConfig.put(“ThreadingModel”, “POOLED”);
>>>>                     FlinkKinesisProducer<String> kinesis = new
>>>> FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);
>>>>                         kinesis.setFailOnError(false);
>>>>                         kinesis.setDefaultStream(“xxx”);
>>>>                         kinesis.setDefaultPartition(“0");
>>>>                         kinesis.setQueueLimit(1000);
>>>>
>>>> *Data in Kinesis :*
>>>> [image: image.png]
>>>>
>>>
>>>
>>> --
>>>
>>> Alexander Preuß | Engineer - Data Intensive Systems
>>>
>>> alexanderpreuss@ververica.com
>>>
>>> <https://www.ververica.com/>
>>>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>>
>>> Ververica GmbH
>>>
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>
>>> Managing Directors: Alexander Walden, Karl Anton Wehner, Yip Park Tung
>>> Jason, Jinwei (Kevin) Zhang
>>>
>>>

Re: Kinesis Sink - Data being received with intermittent breaks

Posted by Zain Haider Nemati <za...@retailo.co>.
Hey Danny,
Thanks for having a look at the issue.
I am using a custom flink operator to segregate the data into a consistent
format of length 100 which is no more than 1 MB. The configurations I
shared were after I was exploring tweaking some of them to see if it
improves the throughput.

Regarding your queries :
- Which Flink version is this? -- > *Version 1.13*
- Can you see any errors in the Flink logs?  -->* No, Im attaching flink
logs after I have set all the configurations to default*
- Do you see any errors/throttles in the Kinesis Data Stream metrics?  --> *I
was before segregating into smaller chunks not anymore*
- How many shards does your stream have? --> *It has 4 shards*
- What is your sink operator parallelism? --> *1*
- What is the general health of your job graph? --> *This is the only job
running at the moment, it isn't unhealthy*
  - Are the operators upstream of the sink backpressured? --> *No*
  - Are you sure the sink is actually the issue here? --> *I have used the
.print() as a sink and Im seeing all the records in real time it chokes
when paired with sink*
  - Are there any other potential bottlenecks? --> *So data is coming in
from source correctly, I have a flatmap transformation enabled which reads
and segments it into chunks of <=1MB which is also tested using the
.print() sink*
- When you say you are trying to achieve "1MB chunks", I assume this is per
Kinesis record, not per PutRecords batch? --> *Correct*

Attaching a small chunk of the log file from when the job is started [It
goes down to 0 records for some periods of time as well, in the log file it
shows mostly between 3-6 records]

Really appreciate your response on this, since I have not been able to
gather much help from other resources online. Would be great if you can let
me know what the issue here could be, let me know if you need to know
anything else as well !

Cheers


On Tue, May 17, 2022 at 12:34 AM Danny Cranmer <da...@apache.org>
wrote:

> Hello Zain,
>
> When you say "converting them to chunks of <= 1MB " does this mean you are
> creating these chunks in a custom Flink operator, or you are relying on
> the connector to do so? If you are generating your own chunks you can
> potentially disable Aggregation at the sink.
>
> Your throughput is incredibly bursty, I have a few questions:
> - Which Flink version is this?
> - Can you see any errors in the Flink logs?
> - Do you see any errors/throttles in the Kinesis Data Stream metrics?
> - How many shards does your stream have?
> - What is your sink operator parallelism?
> - What is the general health of your job graph?
>   - Are the operators upstream of the sink backpressured?
>   - Are you sure the sink is actually the issue here?
>   - Are there any other potential bottlenecks?
> - When you say you are trying to achieve "1MB chunks", I assume this is
> per Kinesis record, not per PutRecords batch?
>
> Some comments on your configuration:
>
> As previously mentioned, if you are generating the chunks you can
> potentially remove the aggregation config and disable it.
> - producerConfig.put(“AggregationMaxCount”, “3”);
> - producerConfig.put(“AggregationMaxSize”, “256”);
> + producerConfig.put("AggregationEnabled”, “false”);
>
> This is very low, and could conflict with your chunk size. These
> configurations are regarding the PutRecords request, which has a quota of
> 500 records and 5MiB. You are setting the max size to 100kB, which is less
> than your largest chunk. I would recommend removing these configurations.
> - producerConfig.put(“CollectionMaxCount”, “3”);
> - producerConfig.put(“CollectionMaxSize”, “100000”);
>
> This is the default threading model, so can be removed.
> - producerConfig.put(“ThreadingModel”, “POOLED”);
>
> This config should not have too much impact. The default is 100ms, you are
> increasing to 1s. This could increase your end-to-end latency under low
> throughput scenarios.
> - producerConfig.put(“RecordMaxBufferedTime”, “1000");
>
> This config controls the sink backpressure and can also impact throughput.
> Do you see any logs like "Waiting for the queue length to drop below the
> limit takes unusually long, still not done after <x> attempts"?
> kinesis.setQueueLimit(1000);
>
> Thanks,
> Danny
>
> On Mon, May 16, 2022 at 5:27 PM Alexander Preuß <
> alexanderpreuss@ververica.com> wrote:
>
>> Hi Zain,
>>
>> I'm looping in Danny here, he is probably the most knowledgeable when it
>> comes to the Kinesis connector.
>>
>> Best,
>> Alexander
>>
>> On Mon, May 16, 2022 at 12:13 AM Zain Haider Nemati <
>> zain.haider@retailo.co> wrote:
>>
>>> Hi,
>>> Im fetching data from kafka topics converting them to chunks of <= 1MB
>>> and sinking them to a kinesis data stream.
>>> The streaming job is functional however I see bursts of data in kinesis
>>> stream with intermittent dips where data received is 0. I'm attaching the
>>> configuration parameters for kinesis sink. What could be the cause of this
>>> issue?
>>> The data is being fed into datastream by a kafka topic which is being
>>> fed in by a mongodb and has about 60 million records which are loaded fully.
>>> I am trying to configure parameters in such a way that the 1MB per data
>>> payload limit of kinesis is not breached. Would appreciate help on this !
>>>
>>> producerConfig.put(AWSConfigConstants.AWS_REGION, “xxx”);
>>>
>>> producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, “xxx");
>>>
>>> producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, “xxx”);
>>>                     producerConfig.put(“AggregationMaxCount”, “3”);
>>>                     producerConfig.put(“AggregationMaxSize”, “256”);
>>>                     producerConfig.put(“CollectionMaxCount”, “3”);
>>>                     producerConfig.put(“CollectionMaxSize”, “100000”);
>>>                     producerConfig.put(“AggregationEnabled”, true);
>>>                     producerConfig.put(“RateLimit”, “50");
>>>                     producerConfig.put(“RecordMaxBufferedTime”, “1000");
>>>                     producerConfig.put(“ThreadingModel”, “POOLED”);
>>>                     FlinkKinesisProducer<String> kinesis = new
>>> FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);
>>>                         kinesis.setFailOnError(false);
>>>                         kinesis.setDefaultStream(“xxx”);
>>>                         kinesis.setDefaultPartition(“0");
>>>                         kinesis.setQueueLimit(1000);
>>>
>>> *Data in Kinesis :*
>>> [image: image.png]
>>>
>>
>>
>> --
>>
>> Alexander Preuß | Engineer - Data Intensive Systems
>>
>> alexanderpreuss@ververica.com
>>
>> <https://www.ververica.com/>
>>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>>
>> Ververica GmbH
>>
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>
>> Managing Directors: Alexander Walden, Karl Anton Wehner, Yip Park Tung
>> Jason, Jinwei (Kevin) Zhang
>>
>>

Re: Kinesis Sink - Data being received with intermittent breaks

Posted by Danny Cranmer <da...@apache.org>.
Hello Zain,

When you say "converting them to chunks of <= 1MB " does this mean you are
creating these chunks in a custom Flink operator, or you are relying on
the connector to do so? If you are generating your own chunks you can
potentially disable Aggregation at the sink.

Your throughput is incredibly bursty, I have a few questions:
- Which Flink version is this?
- Can you see any errors in the Flink logs?
- Do you see any errors/throttles in the Kinesis Data Stream metrics?
- How many shards does your stream have?
- What is your sink operator parallelism?
- What is the general health of your job graph?
  - Are the operators upstream of the sink backpressured?
  - Are you sure the sink is actually the issue here?
  - Are there any other potential bottlenecks?
- When you say you are trying to achieve "1MB chunks", I assume this is per
Kinesis record, not per PutRecords batch?

Some comments on your configuration:

As previously mentioned, if you are generating the chunks you can
potentially remove the aggregation config and disable it.
- producerConfig.put(“AggregationMaxCount”, “3”);
- producerConfig.put(“AggregationMaxSize”, “256”);
+ producerConfig.put("AggregationEnabled”, “false”);

This is very low, and could conflict with your chunk size. These
configurations are regarding the PutRecords request, which has a quota of
500 records and 5MiB. You are setting the max size to 100kB, which is less
than your largest chunk. I would recommend removing these configurations.
- producerConfig.put(“CollectionMaxCount”, “3”);
- producerConfig.put(“CollectionMaxSize”, “100000”);

This is the default threading model, so can be removed.
- producerConfig.put(“ThreadingModel”, “POOLED”);

This config should not have too much impact. The default is 100ms, you are
increasing to 1s. This could increase your end-to-end latency under low
throughput scenarios.
- producerConfig.put(“RecordMaxBufferedTime”, “1000");

This config controls the sink backpressure and can also impact throughput.
Do you see any logs like "Waiting for the queue length to drop below the
limit takes unusually long, still not done after <x> attempts"?
kinesis.setQueueLimit(1000);

Thanks,
Danny

On Mon, May 16, 2022 at 5:27 PM Alexander Preuß <
alexanderpreuss@ververica.com> wrote:

> Hi Zain,
>
> I'm looping in Danny here, he is probably the most knowledgeable when it
> comes to the Kinesis connector.
>
> Best,
> Alexander
>
> On Mon, May 16, 2022 at 12:13 AM Zain Haider Nemati <
> zain.haider@retailo.co> wrote:
>
>> Hi,
>> Im fetching data from kafka topics converting them to chunks of <= 1MB
>> and sinking them to a kinesis data stream.
>> The streaming job is functional however I see bursts of data in kinesis
>> stream with intermittent dips where data received is 0. I'm attaching the
>> configuration parameters for kinesis sink. What could be the cause of this
>> issue?
>> The data is being fed into datastream by a kafka topic which is being fed
>> in by a mongodb and has about 60 million records which are loaded fully.
>> I am trying to configure parameters in such a way that the 1MB per data
>> payload limit of kinesis is not breached. Would appreciate help on this !
>>
>> producerConfig.put(AWSConfigConstants.AWS_REGION, “xxx”);
>>
>> producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, “xxx");
>>
>> producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, “xxx”);
>>                     producerConfig.put(“AggregationMaxCount”, “3”);
>>                     producerConfig.put(“AggregationMaxSize”, “256”);
>>                     producerConfig.put(“CollectionMaxCount”, “3”);
>>                     producerConfig.put(“CollectionMaxSize”, “100000”);
>>                     producerConfig.put(“AggregationEnabled”, true);
>>                     producerConfig.put(“RateLimit”, “50");
>>                     producerConfig.put(“RecordMaxBufferedTime”, “1000");
>>                     producerConfig.put(“ThreadingModel”, “POOLED”);
>>                     FlinkKinesisProducer<String> kinesis = new
>> FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);
>>                         kinesis.setFailOnError(false);
>>                         kinesis.setDefaultStream(“xxx”);
>>                         kinesis.setDefaultPartition(“0");
>>                         kinesis.setQueueLimit(1000);
>>
>> *Data in Kinesis :*
>> [image: image.png]
>>
>
>
> --
>
> Alexander Preuß | Engineer - Data Intensive Systems
>
> alexanderpreuss@ververica.com
>
> <https://www.ververica.com/>
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
>
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>
> Managing Directors: Alexander Walden, Karl Anton Wehner, Yip Park Tung
> Jason, Jinwei (Kevin) Zhang
>
>

Re: Kinesis Sink - Data being received with intermittent breaks

Posted by Alexander Preuß <al...@ververica.com>.
Hi Zain,

I'm looping in Danny here, he is probably the most knowledgeable when it
comes to the Kinesis connector.

Best,
Alexander

On Mon, May 16, 2022 at 12:13 AM Zain Haider Nemati <za...@retailo.co>
wrote:

> Hi,
> Im fetching data from kafka topics converting them to chunks of <= 1MB and
> sinking them to a kinesis data stream.
> The streaming job is functional however I see bursts of data in kinesis
> stream with intermittent dips where data received is 0. I'm attaching the
> configuration parameters for kinesis sink. What could be the cause of this
> issue?
> The data is being fed into datastream by a kafka topic which is being fed
> in by a mongodb and has about 60 million records which are loaded fully.
> I am trying to configure parameters in such a way that the 1MB per data
> payload limit of kinesis is not breached. Would appreciate help on this !
>
> producerConfig.put(AWSConfigConstants.AWS_REGION, “xxx”);
>
> producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, “xxx");
>
> producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, “xxx”);
>                     producerConfig.put(“AggregationMaxCount”, “3”);
>                     producerConfig.put(“AggregationMaxSize”, “256”);
>                     producerConfig.put(“CollectionMaxCount”, “3”);
>                     producerConfig.put(“CollectionMaxSize”, “100000”);
>                     producerConfig.put(“AggregationEnabled”, true);
>                     producerConfig.put(“RateLimit”, “50");
>                     producerConfig.put(“RecordMaxBufferedTime”, “1000");
>                     producerConfig.put(“ThreadingModel”, “POOLED”);
>                     FlinkKinesisProducer<String> kinesis = new
> FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);
>                         kinesis.setFailOnError(false);
>                         kinesis.setDefaultStream(“xxx”);
>                         kinesis.setDefaultPartition(“0");
>                         kinesis.setQueueLimit(1000);
>
> *Data in Kinesis :*
> [image: image.png]
>


-- 

Alexander Preuß | Engineer - Data Intensive Systems

alexanderpreuss@ververica.com

<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH

Registered at Amtsgericht Charlottenburg: HRB 158244 B

Managing Directors: Alexander Walden, Karl Anton Wehner, Yip Park Tung
Jason, Jinwei (Kevin) Zhang