You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Mohammed Kamaal <mo...@gmail.com> on 2021/08/24 07:21:49 UTC

Flink Performance Issue

Hi,

Apologize for the big message, to explain the issue in detail.

We have a Flink (version 1.8) application running on AWS Kinesis Analytics. The application has a source which is a kafka topic with 15 partitions (AWS Managed Streaming Kafka) and the sink is again a kafka topic with 15 partitions.

The size of each stream data is of 4 KB, so which would be 20K * 4 = ~ 79 MB

The application performs some complex business logic with the data and produces the output to the kafka topic.

As part of the performance test, the throughput we are getting for 20K (unique keys) concurrent stream data is 25 minutes.

Our target is to achieve 20K concurrent stream data in 5 minutes.

I have checked the code and did all the optimizations possible to the business logic code, but still don't see any improvement.

Tried increasing the parallelism from 5 to 8 but its the same throughput with both 5 and 8 parallelism.

I could also see the stream is distributed between all the 8 slots, though there is a lag of 2K between the first operator and the next consecutive operators.

Checkpoint is enabled with default (Kinesis analytics) every one minute.

Have also tried having different parallelism for each of the operators.

Can you please suggest any other performance optimizations that need to be considered or if I am making any mistake here?.

Here is my sample code
----------------------------------

StreamExecutionEnvironment streamenv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> initialStreamData = streamenv.addSource(new FlinkKafkaConsumer<>(TOPIC_NAME, new ObjectNodeJsonDeSerializerSchema(), kafkaConnectProperties);
initialStreamData.print();

DataStream<POJO> rawDataProcess = initialStreamData.rebalance().flatMap(new ReProcessingDataProvider()).keyBy(value -> value.getPersonId());
rawDataProcess.print();

DataStream<POJO> cgmStream = rawDataProcess.keyBy(new ReProcessorKeySelector()).rebalance().flatMap(new SgStreamTask()); //the same person_id key
cgmStream.print();

DataStream<POJO> artfctOverlapStream = null;
artfctOverlapStream = cgmStreamData.keyBy(new CGMKeySelector()).countWindow(2, 1)
.apply(new ArtifactOverlapProvider()); //the same person_id key
cgmStreamData.print();

DataStream<POJO> streamWithSgRoc = null;
streamWithSgRoc = artfctOverlapStream.keyBy(new CGMKeySelector()).countWindow(7, 1)
.apply(new SgRocProvider()); // the same person_id key
streamWithSgRoc.print();

DataStream<POJO> cgmExcursionStream = null;
cgmExcursionStream = streamWithSgRoc.keyBy(new CGMKeySelector())
.countWindow(Common.THREE, Common.ONE).apply(new CGMExcursionProviderStream()); //the same person_id key
cgmExcursionStream.print();

cgmExcursionStream.addSink(new FlinkKafkaProducer<CGMDataCollector>(
topicsProperties.getProperty(Common.CGM_EVENT_TOPIC), new CGMDataCollectorSchema(),
kafkaConnectProperties));

Thanks

Re: Flink Performance Issue

Posted by Arvid Heise <ar...@apache.org>.
Hi Kamaal,

I did a quick test with a local Kafka in docker. With parallelism 1, I can
process 20k messages of size 4KB in about 1 min. So if you use parallelism
of 15, I'd expect it to take it below 10s even with bigger data skew.

What I recommend you to do is to start from scratch and just work with a
simple source -> sink. That should be much much faster. If so, then you can
add complexity until you find the bottleneck.

If not, I suspect your ObjectNodeJsonDeSerializerSchema to be the issue.
For example, are you creating an ObjectMapper with each invocation? That's
a typical mistake.

Best,

Arvid

On Mon, Sep 27, 2021 at 2:38 PM Mohammed Kamaal <mo...@gmail.com>
wrote:

> Hi Robert,
>
> I have removed all the business logic (keyBy and window) operator code and
> just had a source and sink to test it.
> The throughput is 20K messages in 2 minutes. It is a simple read from
> source (kafka topic) and write to sink (kafka topic). Don't you think 2
> minutes is also not a better throughput for a simple read/write
> application?. Each message is 4 KB.
>
> As I had mentioned in the previous email(s), I am using keyBy() and
> Window() to handle business logic. Do you think these operators would have
> a huge impact on the performance?. Or is it something to do with my Kafka
> cluster configuration or the older version of flink (1.8) that I am using
> in my application. Not sure if flink version 1.8 has a performance issue.
>
> Please let me know.
> Below is my kafka cluster configuration.
>
> auto.create.topics.enable=true
> log.retention.hours=24
> default.replication.factor=3
> min.insync.replicas=2
> num.io.threads=45
> num.network.threads=60
> num.partitions=45
> num.replica.fetchers=2
> unclean.leader.election.enable=true
> replica.lag.time.max.ms=30000
> zookeeper.session.timeout.ms=18000
> log.retention.ms=172800000
> log.cleanup.policy=delete
> group.max.session.timeout.ms=1200000
>
>
>
> Thanks
>
> On Wed, Sep 22, 2021 at 9:06 PM Robert Metzger <rm...@apache.org>
> wrote:
>
>> Hi Kamaal,
>>
>> I would first suggest understanding the performance bottleneck, before
>> applying any optimizations.
>>
>> Idea 1: Are your CPUs fully utilized?
>> if yes, good, then scaling up will probably help
>> If not, then there's another inefficiency
>>
>> Idea 2: How fast can you get the data into your job, without any
>> processing?
>> You can measure this by submitting a simple Flink job that just reads the
>> data and writes it to a discarding sink. Either disable the operator
>> chaining to get metrics for the records per second, or add a custom mapper
>> in between that measures the throughput.
>> Ideally you see here that you can read all your data in a few seconds, if
>> not, then there's a problem getting your data in.
>>
>> Idea 3: is your IO fully utilized ?( if you are checkpointing to RocksDB,
>> the disk can dramatically slow you down)
>> Idea 4: Are you under high memory pressure, and your JVMs are spending
>> most of their cycles garbage collecting?
>>
>> My bet is you are not getting data into your cluster as fast as you think
>> (Idea 2)
>>
>>
>> On Wed, Sep 22, 2021 at 12:05 PM Mohammed Kamaal <
>> mohammed.kamaal.k@gmail.com> wrote:
>>
>>> Hi Arvid,
>>>
>>> The throughput has decreased further after I removed all the
>>> rebalance(). The performance has decreased from 14 minutes for 20K messages
>>> to 20 minutes for 20K messages.
>>>
>>> Below are the tasks that the flink application is performing. I am using
>>> keyBy and Window operation. Do you think am I making any mistake here or
>>> the way I am performing the keyBy or Window operation needs to be
>>> corrected?.
>>>
>>> //Add Source
>>> StreamExecutionEnvironment streamenv =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> initialStreamData = streamenv.addSource(new
>>> FlinkKafkaConsumer<>(topicsProperties.getProperty(Common.CGM_STREAM_TOPIC),
>>> new *ObjectNodeJsonDeSerializerSchema()*,
>>> kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);
>>>
>>> DataStream<CGM> cgmStreamData = initialStreamData.keyBy(value ->
>>> value.findValue("PERSON_ID").asText())
>>> .flatMap(new *SgStreamingTask()*).setParallelism(Common.FORTY_FIVE);
>>>
>>> DataStream<CGM> artfctOverlapStream = cgmStreamData.keyBy(new
>>> CGMKeySelector()).countWindow(2, 1)
>>> .apply(new *ArtifactOverlapProvider()*
>>> ).setParallelism(Common.FORTY_FIVE).rebalance();
>>>
>>> DataStream<CGM> streamWithSgRoc = artfctOverlapStream.keyBy(new
>>> CGMKeySelector()).countWindow(7, 1)
>>> .apply(new *SgRocProvider()*
>>> ).setParallelism(Common.FORTY_FIVE).rebalance();
>>>
>>> DataStream<CGMDataCollector> cgmExcursionStream =
>>> streamWithSgRoc.keyBy(new CGMKeySelector())
>>> .countWindow(Common.THREE, Common.ONE).apply(new
>>> *CGMExcursionProviderStream()*
>>> ).setParallelism(Common.FORTY_FIVE).rebalance();
>>>
>>> //Add Sink
>>> cgmExcursionStream.addSink(new FlinkKafkaProducer<CGMDataCollector>(
>>> topicsProperties.getProperty(Common.CGM_EVENT_TOPIC), new
>>> CGMDataCollectorSchema(),
>>> kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);
>>>
>>> *Implementation classes:-*
>>>
>>> //deserialize the json message received
>>> *ObjectNodeJsonDeSerializerSchema* implements
>>> KeyedDeserializationSchema<ObjectNode>{
>>> public ObjectNode deserialize(byte[] messageKey, byte[] message, String
>>> topic, int partition, long offset);
>>> }
>>>
>>> //Flapmap to check each message and apply validation
>>> public class *SgStreamingTask* extends RichFlatMapFunction<ObjectNode,
>>> CGM> {
>>> void flatMap(ObjectNode streamData, Collector<CGM> out);
>>> }
>>>
>>> //persist three state variables and apply business logic
>>> public class *ArtifactOverlapProvider* extends RichFlatMapFunction<CGM,
>>> Tuple2<Long, Long>>
>>> implements WindowFunction<CGM, CGM, String, GlobalWindow> {
>>> public void apply(String key, GlobalWindow window, Iterable<CGM> values,
>>> Collector<CGM> out);
>>> }
>>>
>>> //Apply business logic
>>> public class *SgRocProvider* implements WindowFunction<CGM, CGM,
>>> String, GlobalWindow>{
>>> public void apply(String key, GlobalWindow window, Iterable<CGM> values,
>>> Collector<CGM> out);
>>> }
>>>
>>> //persist 3 state variables and apply business logic
>>> public class *CGMExcursionProviderStream* extends
>>> RichFlatMapFunction<CGM, Tuple2<Long, Long>>
>>> implements WindowFunction<CGM, CGMDataCollector, String, GlobalWindow>{
>>> public void apply(String key, GlobalWindow window, Iterable<CGM> values,
>>> Collector<CGMDataCollector> out);
>>>
>>> }
>>>
>>> Thanks
>>> Kamaal
>>>
>>>
>>> On Mon, Sep 6, 2021 at 9:57 PM Arvid Heise <ar...@apache.org> wrote:
>>>
>>>> Hi Mohammed,
>>>>
>>>> something is definitely wrong in your setup. You can safely say that
>>>> you can process 1k records per second and core with Kafka and light
>>>> processing, so you shouldn't even need to go distributed in your case.
>>>>
>>>> Do you perform any heavy computation? What is your flatMap doing? Are
>>>> you emitting lots of small records from one big record?
>>>>
>>>> Can you please remove all rebalance and report back? Rebalance is
>>>> counter-productive if you don't exactly know that you need it.
>>>>
>>>> On Thu, Sep 2, 2021 at 1:36 PM Mohammed Kamaal <
>>>> mohammed.kamaal.k@gmail.com> wrote:
>>>>
>>>>> Hi Fabian,
>>>>>
>>>>> Just an update,
>>>>>
>>>>> Problem 2:-
>>>>> ----------------
>>>>> Caused by: org.apache.kafka.common.errors.NetworkException
>>>>> It is resolved. It was because we exceeded the number of allowed
>>>>> partitions for the kafka cluster (AWS MSK cluster). Have deleted
>>>>> unused topics and partitions to resolve the issue.
>>>>>
>>>>> Problem 1:-
>>>>> ----------------
>>>>> I increased the kafka partition and flink parallelism to 45 and the
>>>>> throughput has improved from 20 minutes to 14 minutes (20K records).
>>>>> Can you check the flink graph and let me know if there is anything
>>>>> else that can be done here to improve the throughput further.
>>>>>
>>>>> Thanks
>>>>>
>>>>> On Wed, Sep 1, 2021 at 10:55 PM Mohammed Kamaal
>>>>> <mo...@gmail.com> wrote:
>>>>> >
>>>>> > Hi Fabian,
>>>>> >
>>>>> > Problem 1:-
>>>>> > ---------------------
>>>>> > I have removed the print out sink's and ran the test again. This time
>>>>> > the throughput is 17 minutes for 20K records (200 records every
>>>>> > second). Earlier it was 20 minutes for 20K records. (parallelism 15
>>>>> > and kafka partition of 15)
>>>>> >
>>>>> > Please find the attached application graph. Can you suggest what else
>>>>> > is required further to improve the throughput.
>>>>> >
>>>>> > Problem 2:-
>>>>> > ---------------------
>>>>> > Also, I tried to increase the parallelism to 45 from 15 (also
>>>>> > increasing the kafka partition to 45 from 15) to see if this helps in
>>>>> > getting a better throughput.
>>>>> >
>>>>> > After increasing the partition, I am facing the Network issue with
>>>>> > Kafka Cluster (AWS Managed Stream Kafka). I am not getting this issue
>>>>> > with 15 partitions for the kafka topic. This could be an issue with
>>>>> > the Kafka cluster?
>>>>> >
>>>>> > Kafka Cluster Configuration:-
>>>>> > ---------------------------------------
>>>>> > auto.create.topics.enable=true
>>>>> > log.retention.hours=24
>>>>> > default.replication.factor=3
>>>>> > min.insync.replicas=2
>>>>> > num.io.threads=45
>>>>> > num.network.threads=60
>>>>> > num.partitions=45
>>>>> > num.replica.fetchers=2
>>>>> > unclean.leader.election.enable=true
>>>>> > replica.lag.time.max.ms=30000
>>>>> > zookeeper.session.timeout.ms=18000
>>>>> > log.retention.ms=172800000
>>>>> > log.cleanup.policy=delete
>>>>> > group.max.session.timeout.ms=1200000
>>>>> >
>>>>> > Exception:-
>>>>> > ----------------
>>>>> >  "locationInformation":
>>>>> >
>>>>> "org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:500)",
>>>>> >     "logger": "org.apache.flink.streaming.runtime.tasks.StreamTask",
>>>>> >     "message": "Error during disposal of stream operator.",
>>>>> >     "throwableInformation": [
>>>>> >
>>>>>  "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
>>>>> > Failed to send data to Kafka: Failed to send data to Kafka: The
>>>>> server
>>>>> > disconnected
>>>>> >
>>>>> > "Caused by: org.apache.kafka.common.errors.NetworkException: The
>>>>> > server disconnected before a response was received."
>>>>> >
>>>>> >
>>>>> > Thanks
>>>>> >
>>>>> >
>>>>> > On Wed, Aug 25, 2021 at 12:11 PM Fabian Paul <
>>>>> fabianpaul@ververica.com> wrote:
>>>>> > >
>>>>> > > Hi Mohammed,
>>>>> > >
>>>>> > > 200records should definitely be doable. The first you can do is
>>>>> remove the print out Sink because they are increasing the load on your
>>>>> cluster due to the additional IO
>>>>> > > operation and secondly preventing Flink from fusing operators.
>>>>> > > I am interested to see the updated job graph after the removal of
>>>>> the print sinks.
>>>>> > >
>>>>> > > Best,
>>>>> > > Fabian
>>>>>
>>>>

Re: Flink Performance Issue

Posted by Mohammed Kamaal <mo...@gmail.com>.
Hi Robert,

I have removed all the business logic (keyBy and window) operator code and just had a source and sink to test it.
The throughput is 20K messages in 2 minutes. It is a simple read from source (kafka topic) and write to sink (kafka topic). Don't you think 2 minutes is also not a better throughput for a simple read/write application?. Each message is 4 KB.

As I had mentioned in the previous email(s), I am using keyBy() and Window() to handle business logic. Do you think these operators would have a huge impact on the performance?. Or is it something to do with my Kafka cluster configuration or the older version of flink (1.8) that I am using in my application. Not sure if flink version 1.8 has a performance issue.

Please let me know.
Below is my kafka cluster configuration.

auto.create.topics.enable=true
log.retention.hours=24
default.replication.factor=3
min.insync.replicas=2
num.io.threads=45
num.network.threads=60
num.partitions=45
num.replica.fetchers=2
unclean.leader.election.enable=true
replica.lag.time.max.ms=30000
zookeeper.session.timeout.ms=18000
log.retention.ms=172800000
log.cleanup.policy=delete
group.max.session.timeout.ms=1200000



Thanks

> On Wed, Sep 22, 2021 at 9:06 PM Robert Metzger <rm...@apache.org> wrote:
> Hi Kamaal,
> 
> I would first suggest understanding the performance bottleneck, before applying any optimizations.
> 
> Idea 1: Are your CPUs fully utilized?
> if yes, good, then scaling up will probably help
> If not, then there's another inefficiency
> 
> Idea 2: How fast can you get the data into your job, without any processing?
> You can measure this by submitting a simple Flink job that just reads the data and writes it to a discarding sink. Either disable the operator chaining to get metrics for the records per second, or add a custom mapper in between that measures the throughput.
> Ideally you see here that you can read all your data in a few seconds, if not, then there's a problem getting your data in.
> 
> Idea 3: is your IO fully utilized ?( if you are checkpointing to RocksDB, the disk can dramatically slow you down)
> Idea 4: Are you under high memory pressure, and your JVMs are spending most of their cycles garbage collecting?
> 
> My bet is you are not getting data into your cluster as fast as you think (Idea 2)
> 
> 
>> On Wed, Sep 22, 2021 at 12:05 PM Mohammed Kamaal <mo...@gmail.com> wrote:
>> Hi Arvid,
>> 
>> The throughput has decreased further after I removed all the rebalance(). The performance has decreased from 14 minutes for 20K messages to 20 minutes for 20K messages.
>> 
>> Below are the tasks that the flink application is performing. I am using keyBy and Window operation. Do you think am I making any mistake here or the way I am performing the keyBy or Window operation needs to be corrected?.
>> 
>> //Add Source
>> StreamExecutionEnvironment streamenv = StreamExecutionEnvironment.getExecutionEnvironment();
>> initialStreamData = streamenv.addSource(new FlinkKafkaConsumer<>(topicsProperties.getProperty(Common.CGM_STREAM_TOPIC),
>> new ObjectNodeJsonDeSerializerSchema(), kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);
>> 
>> DataStream<CGM> cgmStreamData = initialStreamData.keyBy(value -> value.findValue("PERSON_ID").asText())
>> .flatMap(new SgStreamingTask()).setParallelism(Common.FORTY_FIVE);
>> 
>> DataStream<CGM> artfctOverlapStream = cgmStreamData.keyBy(new CGMKeySelector()).countWindow(2, 1)
>> .apply(new ArtifactOverlapProvider()).setParallelism(Common.FORTY_FIVE).rebalance();
>> 
>> DataStream<CGM> streamWithSgRoc = artfctOverlapStream.keyBy(new CGMKeySelector()).countWindow(7, 1)
>> .apply(new SgRocProvider()).setParallelism(Common.FORTY_FIVE).rebalance();
>> 
>> DataStream<CGMDataCollector> cgmExcursionStream = streamWithSgRoc.keyBy(new CGMKeySelector())
>> .countWindow(Common.THREE, Common.ONE).apply(new CGMExcursionProviderStream()).setParallelism(Common.FORTY_FIVE).rebalance();
>> 
>> //Add Sink
>> cgmExcursionStream.addSink(new FlinkKafkaProducer<CGMDataCollector>(
>> topicsProperties.getProperty(Common.CGM_EVENT_TOPIC), new CGMDataCollectorSchema(),
>> kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);
>> 
>> Implementation classes:-
>> 
>> //deserialize the json message received
>> ObjectNodeJsonDeSerializerSchema implements KeyedDeserializationSchema<ObjectNode>{
>> public ObjectNode deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset);
>> }
>> 
>> //Flapmap to check each message and apply validation
>> public class SgStreamingTask extends RichFlatMapFunction<ObjectNode, CGM> {
>> void flatMap(ObjectNode streamData, Collector<CGM> out);
>> }
>> 
>> //persist three state variables and apply business logic
>> public class ArtifactOverlapProvider extends RichFlatMapFunction<CGM, Tuple2<Long, Long>>
>> implements WindowFunction<CGM, CGM, String, GlobalWindow> {
>> public void apply(String key, GlobalWindow window, Iterable<CGM> values, Collector<CGM> out);
>> }
>> 
>> //Apply business logic
>> public class SgRocProvider implements WindowFunction<CGM, CGM, String, GlobalWindow>{
>> public void apply(String key, GlobalWindow window, Iterable<CGM> values, Collector<CGM> out);
>> }
>> 
>> //persist 3 state variables and apply business logic
>> public class CGMExcursionProviderStream extends RichFlatMapFunction<CGM, Tuple2<Long, Long>>
>> implements WindowFunction<CGM, CGMDataCollector, String, GlobalWindow>{
>> public void apply(String key, GlobalWindow window, Iterable<CGM> values, Collector<CGMDataCollector> out);
>> 
>> }
>> 
>> Thanks
>> Kamaal
>> 
>> 
>>> On Mon, Sep 6, 2021 at 9:57 PM Arvid Heise <ar...@apache.org> wrote:
>>> Hi Mohammed,
>>> 
>>> something is definitely wrong in your setup. You can safely say that you can process 1k records per second and core with Kafka and light processing, so you shouldn't even need to go distributed in your case.
>>> 
>>> Do you perform any heavy computation? What is your flatMap doing? Are you emitting lots of small records from one big record?
>>> 
>>> Can you please remove all rebalance and report back? Rebalance is counter-productive if you don't exactly know that you need it.
>>> 
>>>> On Thu, Sep 2, 2021 at 1:36 PM Mohammed Kamaal <mo...@gmail.com> wrote:
>>>> Hi Fabian,
>>>> 
>>>> Just an update,
>>>> 
>>>> Problem 2:-
>>>> ----------------
>>>> Caused by: org.apache.kafka.common.errors.NetworkException
>>>> It is resolved. It was because we exceeded the number of allowed
>>>> partitions for the kafka cluster (AWS MSK cluster). Have deleted
>>>> unused topics and partitions to resolve the issue.
>>>> 
>>>> Problem 1:-
>>>> ----------------
>>>> I increased the kafka partition and flink parallelism to 45 and the
>>>> throughput has improved from 20 minutes to 14 minutes (20K records).
>>>> Can you check the flink graph and let me know if there is anything
>>>> else that can be done here to improve the throughput further.
>>>> 
>>>> Thanks
>>>> 
>>>> On Wed, Sep 1, 2021 at 10:55 PM Mohammed Kamaal
>>>> <mo...@gmail.com> wrote:
>>>> >
>>>> > Hi Fabian,
>>>> >
>>>> > Problem 1:-
>>>> > ---------------------
>>>> > I have removed the print out sink's and ran the test again. This time
>>>> > the throughput is 17 minutes for 20K records (200 records every
>>>> > second). Earlier it was 20 minutes for 20K records. (parallelism 15
>>>> > and kafka partition of 15)
>>>> >
>>>> > Please find the attached application graph. Can you suggest what else
>>>> > is required further to improve the throughput.
>>>> >
>>>> > Problem 2:-
>>>> > ---------------------
>>>> > Also, I tried to increase the parallelism to 45 from 15 (also
>>>> > increasing the kafka partition to 45 from 15) to see if this helps in
>>>> > getting a better throughput.
>>>> >
>>>> > After increasing the partition, I am facing the Network issue with
>>>> > Kafka Cluster (AWS Managed Stream Kafka). I am not getting this issue
>>>> > with 15 partitions for the kafka topic. This could be an issue with
>>>> > the Kafka cluster?
>>>> >
>>>> > Kafka Cluster Configuration:-
>>>> > ---------------------------------------
>>>> > auto.create.topics.enable=true
>>>> > log.retention.hours=24
>>>> > default.replication.factor=3
>>>> > min.insync.replicas=2
>>>> > num.io.threads=45
>>>> > num.network.threads=60
>>>> > num.partitions=45
>>>> > num.replica.fetchers=2
>>>> > unclean.leader.election.enable=true
>>>> > replica.lag.time.max.ms=30000
>>>> > zookeeper.session.timeout.ms=18000
>>>> > log.retention.ms=172800000
>>>> > log.cleanup.policy=delete
>>>> > group.max.session.timeout.ms=1200000
>>>> >
>>>> > Exception:-
>>>> > ----------------
>>>> >  "locationInformation":
>>>> > "org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:500)",
>>>> >     "logger": "org.apache.flink.streaming.runtime.tasks.StreamTask",
>>>> >     "message": "Error during disposal of stream operator.",
>>>> >     "throwableInformation": [
>>>> >         "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
>>>> > Failed to send data to Kafka: Failed to send data to Kafka: The server
>>>> > disconnected
>>>> >
>>>> > "Caused by: org.apache.kafka.common.errors.NetworkException: The
>>>> > server disconnected before a response was received."
>>>> >
>>>> >
>>>> > Thanks
>>>> >
>>>> >
>>>> > On Wed, Aug 25, 2021 at 12:11 PM Fabian Paul <fa...@ververica.com> wrote:
>>>> > >
>>>> > > Hi Mohammed,
>>>> > >
>>>> > > 200records should definitely be doable. The first you can do is remove the print out Sink because they are increasing the load on your cluster due to the additional IO
>>>> > > operation and secondly preventing Flink from fusing operators.
>>>> > > I am interested to see the updated job graph after the removal of the print sinks.
>>>> > >
>>>> > > Best,
>>>> > > Fabian

Re: Flink Performance Issue

Posted by Robert Metzger <rm...@apache.org>.
Hi Kamaal,

I would first suggest understanding the performance bottleneck, before
applying any optimizations.

Idea 1: Are your CPUs fully utilized?
if yes, good, then scaling up will probably help
If not, then there's another inefficiency

Idea 2: How fast can you get the data into your job, without any processing?
You can measure this by submitting a simple Flink job that just reads the
data and writes it to a discarding sink. Either disable the operator
chaining to get metrics for the records per second, or add a custom mapper
in between that measures the throughput.
Ideally you see here that you can read all your data in a few seconds, if
not, then there's a problem getting your data in.

Idea 3: is your IO fully utilized ?( if you are checkpointing to RocksDB,
the disk can dramatically slow you down)
Idea 4: Are you under high memory pressure, and your JVMs are spending most
of their cycles garbage collecting?

My bet is you are not getting data into your cluster as fast as you think
(Idea 2)


On Wed, Sep 22, 2021 at 12:05 PM Mohammed Kamaal <
mohammed.kamaal.k@gmail.com> wrote:

> Hi Arvid,
>
> The throughput has decreased further after I removed all the rebalance().
> The performance has decreased from 14 minutes for 20K messages to 20
> minutes for 20K messages.
>
> Below are the tasks that the flink application is performing. I am using
> keyBy and Window operation. Do you think am I making any mistake here or
> the way I am performing the keyBy or Window operation needs to be
> corrected?.
>
> //Add Source
> StreamExecutionEnvironment streamenv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> initialStreamData = streamenv.addSource(new
> FlinkKafkaConsumer<>(topicsProperties.getProperty(Common.CGM_STREAM_TOPIC),
> new *ObjectNodeJsonDeSerializerSchema()*,
> kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);
>
> DataStream<CGM> cgmStreamData = initialStreamData.keyBy(value ->
> value.findValue("PERSON_ID").asText())
> .flatMap(new *SgStreamingTask()*).setParallelism(Common.FORTY_FIVE);
>
> DataStream<CGM> artfctOverlapStream = cgmStreamData.keyBy(new
> CGMKeySelector()).countWindow(2, 1)
> .apply(new *ArtifactOverlapProvider()*
> ).setParallelism(Common.FORTY_FIVE).rebalance();
>
> DataStream<CGM> streamWithSgRoc = artfctOverlapStream.keyBy(new
> CGMKeySelector()).countWindow(7, 1)
> .apply(new *SgRocProvider()*
> ).setParallelism(Common.FORTY_FIVE).rebalance();
>
> DataStream<CGMDataCollector> cgmExcursionStream =
> streamWithSgRoc.keyBy(new CGMKeySelector())
> .countWindow(Common.THREE, Common.ONE).apply(new
> *CGMExcursionProviderStream()*
> ).setParallelism(Common.FORTY_FIVE).rebalance();
>
> //Add Sink
> cgmExcursionStream.addSink(new FlinkKafkaProducer<CGMDataCollector>(
> topicsProperties.getProperty(Common.CGM_EVENT_TOPIC), new
> CGMDataCollectorSchema(),
> kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);
>
> *Implementation classes:-*
>
> //deserialize the json message received
> *ObjectNodeJsonDeSerializerSchema* implements
> KeyedDeserializationSchema<ObjectNode>{
> public ObjectNode deserialize(byte[] messageKey, byte[] message, String
> topic, int partition, long offset);
> }
>
> //Flapmap to check each message and apply validation
> public class *SgStreamingTask* extends RichFlatMapFunction<ObjectNode,
> CGM> {
> void flatMap(ObjectNode streamData, Collector<CGM> out);
> }
>
> //persist three state variables and apply business logic
> public class *ArtifactOverlapProvider* extends RichFlatMapFunction<CGM,
> Tuple2<Long, Long>>
> implements WindowFunction<CGM, CGM, String, GlobalWindow> {
> public void apply(String key, GlobalWindow window, Iterable<CGM> values,
> Collector<CGM> out);
> }
>
> //Apply business logic
> public class *SgRocProvider* implements WindowFunction<CGM, CGM, String,
> GlobalWindow>{
> public void apply(String key, GlobalWindow window, Iterable<CGM> values,
> Collector<CGM> out);
> }
>
> //persist 3 state variables and apply business logic
> public class *CGMExcursionProviderStream* extends
> RichFlatMapFunction<CGM, Tuple2<Long, Long>>
> implements WindowFunction<CGM, CGMDataCollector, String, GlobalWindow>{
> public void apply(String key, GlobalWindow window, Iterable<CGM> values,
> Collector<CGMDataCollector> out);
>
> }
>
> Thanks
> Kamaal
>
>
> On Mon, Sep 6, 2021 at 9:57 PM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Mohammed,
>>
>> something is definitely wrong in your setup. You can safely say that you
>> can process 1k records per second and core with Kafka and light processing,
>> so you shouldn't even need to go distributed in your case.
>>
>> Do you perform any heavy computation? What is your flatMap doing? Are you
>> emitting lots of small records from one big record?
>>
>> Can you please remove all rebalance and report back? Rebalance is
>> counter-productive if you don't exactly know that you need it.
>>
>> On Thu, Sep 2, 2021 at 1:36 PM Mohammed Kamaal <
>> mohammed.kamaal.k@gmail.com> wrote:
>>
>>> Hi Fabian,
>>>
>>> Just an update,
>>>
>>> Problem 2:-
>>> ----------------
>>> Caused by: org.apache.kafka.common.errors.NetworkException
>>> It is resolved. It was because we exceeded the number of allowed
>>> partitions for the kafka cluster (AWS MSK cluster). Have deleted
>>> unused topics and partitions to resolve the issue.
>>>
>>> Problem 1:-
>>> ----------------
>>> I increased the kafka partition and flink parallelism to 45 and the
>>> throughput has improved from 20 minutes to 14 minutes (20K records).
>>> Can you check the flink graph and let me know if there is anything
>>> else that can be done here to improve the throughput further.
>>>
>>> Thanks
>>>
>>> On Wed, Sep 1, 2021 at 10:55 PM Mohammed Kamaal
>>> <mo...@gmail.com> wrote:
>>> >
>>> > Hi Fabian,
>>> >
>>> > Problem 1:-
>>> > ---------------------
>>> > I have removed the print out sink's and ran the test again. This time
>>> > the throughput is 17 minutes for 20K records (200 records every
>>> > second). Earlier it was 20 minutes for 20K records. (parallelism 15
>>> > and kafka partition of 15)
>>> >
>>> > Please find the attached application graph. Can you suggest what else
>>> > is required further to improve the throughput.
>>> >
>>> > Problem 2:-
>>> > ---------------------
>>> > Also, I tried to increase the parallelism to 45 from 15 (also
>>> > increasing the kafka partition to 45 from 15) to see if this helps in
>>> > getting a better throughput.
>>> >
>>> > After increasing the partition, I am facing the Network issue with
>>> > Kafka Cluster (AWS Managed Stream Kafka). I am not getting this issue
>>> > with 15 partitions for the kafka topic. This could be an issue with
>>> > the Kafka cluster?
>>> >
>>> > Kafka Cluster Configuration:-
>>> > ---------------------------------------
>>> > auto.create.topics.enable=true
>>> > log.retention.hours=24
>>> > default.replication.factor=3
>>> > min.insync.replicas=2
>>> > num.io.threads=45
>>> > num.network.threads=60
>>> > num.partitions=45
>>> > num.replica.fetchers=2
>>> > unclean.leader.election.enable=true
>>> > replica.lag.time.max.ms=30000
>>> > zookeeper.session.timeout.ms=18000
>>> > log.retention.ms=172800000
>>> > log.cleanup.policy=delete
>>> > group.max.session.timeout.ms=1200000
>>> >
>>> > Exception:-
>>> > ----------------
>>> >  "locationInformation":
>>> >
>>> "org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:500)",
>>> >     "logger": "org.apache.flink.streaming.runtime.tasks.StreamTask",
>>> >     "message": "Error during disposal of stream operator.",
>>> >     "throwableInformation": [
>>> >
>>>  "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
>>> > Failed to send data to Kafka: Failed to send data to Kafka: The server
>>> > disconnected
>>> >
>>> > "Caused by: org.apache.kafka.common.errors.NetworkException: The
>>> > server disconnected before a response was received."
>>> >
>>> >
>>> > Thanks
>>> >
>>> >
>>> > On Wed, Aug 25, 2021 at 12:11 PM Fabian Paul <fa...@ververica.com>
>>> wrote:
>>> > >
>>> > > Hi Mohammed,
>>> > >
>>> > > 200records should definitely be doable. The first you can do is
>>> remove the print out Sink because they are increasing the load on your
>>> cluster due to the additional IO
>>> > > operation and secondly preventing Flink from fusing operators.
>>> > > I am interested to see the updated job graph after the removal of
>>> the print sinks.
>>> > >
>>> > > Best,
>>> > > Fabian
>>>
>>

Re: Flink Performance Issue

Posted by Mohammed Kamaal <mo...@gmail.com>.
Hi Arvid,

The throughput has decreased further after I removed all the rebalance(). The performance has decreased from 14 minutes for 20K messages to 20 minutes for 20K messages.

Below are the tasks that the flink application is performing. I am using keyBy and Window operation. Do you think am I making any mistake here or the way I am performing the keyBy or Window operation needs to be corrected?.

//Add Source
StreamExecutionEnvironment streamenv = StreamExecutionEnvironment.getExecutionEnvironment();
initialStreamData = streamenv.addSource(new FlinkKafkaConsumer<>(topicsProperties.getProperty(Common.CGM_STREAM_TOPIC),
new ObjectNodeJsonDeSerializerSchema(), kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);

DataStream<CGM> cgmStreamData = initialStreamData.keyBy(value -> value.findValue("PERSON_ID").asText())
.flatMap(new SgStreamingTask()).setParallelism(Common.FORTY_FIVE);

DataStream<CGM> artfctOverlapStream = cgmStreamData.keyBy(new CGMKeySelector()).countWindow(2, 1)
.apply(new ArtifactOverlapProvider()).setParallelism(Common.FORTY_FIVE).rebalance();

DataStream<CGM> streamWithSgRoc = artfctOverlapStream.keyBy(new CGMKeySelector()).countWindow(7, 1)
.apply(new SgRocProvider()).setParallelism(Common.FORTY_FIVE).rebalance();

DataStream<CGMDataCollector> cgmExcursionStream = streamWithSgRoc.keyBy(new CGMKeySelector())
.countWindow(Common.THREE, Common.ONE).apply(new CGMExcursionProviderStream()).setParallelism(Common.FORTY_FIVE).rebalance();

//Add Sink
cgmExcursionStream.addSink(new FlinkKafkaProducer<CGMDataCollector>(
topicsProperties.getProperty(Common.CGM_EVENT_TOPIC), new CGMDataCollectorSchema(),
kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);

Implementation classes:-

//deserialize the json message received
ObjectNodeJsonDeSerializerSchema implements KeyedDeserializationSchema<ObjectNode>{
public ObjectNode deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset);
}

//Flapmap to check each message and apply validation
public class SgStreamingTask extends RichFlatMapFunction<ObjectNode, CGM> {
void flatMap(ObjectNode streamData, Collector<CGM> out);
}

//persist three state variables and apply business logic
public class ArtifactOverlapProvider extends RichFlatMapFunction<CGM, Tuple2<Long, Long>>
implements WindowFunction<CGM, CGM, String, GlobalWindow> {
public void apply(String key, GlobalWindow window, Iterable<CGM> values, Collector<CGM> out);
}

//Apply business logic
public class SgRocProvider implements WindowFunction<CGM, CGM, String, GlobalWindow>{
public void apply(String key, GlobalWindow window, Iterable<CGM> values, Collector<CGM> out);
}

//persist 3 state variables and apply business logic
public class CGMExcursionProviderStream extends RichFlatMapFunction<CGM, Tuple2<Long, Long>>
implements WindowFunction<CGM, CGMDataCollector, String, GlobalWindow>{
public void apply(String key, GlobalWindow window, Iterable<CGM> values, Collector<CGMDataCollector> out);

}

Thanks
Kamaal


> On Mon, Sep 6, 2021 at 9:57 PM Arvid Heise <ar...@apache.org> wrote:
> Hi Mohammed,
> 
> something is definitely wrong in your setup. You can safely say that you can process 1k records per second and core with Kafka and light processing, so you shouldn't even need to go distributed in your case.
> 
> Do you perform any heavy computation? What is your flatMap doing? Are you emitting lots of small records from one big record?
> 
> Can you please remove all rebalance and report back? Rebalance is counter-productive if you don't exactly know that you need it.
> 
>> On Thu, Sep 2, 2021 at 1:36 PM Mohammed Kamaal <mo...@gmail.com> wrote:
>> Hi Fabian,
>> 
>> Just an update,
>> 
>> Problem 2:-
>> ----------------
>> Caused by: org.apache.kafka.common.errors.NetworkException
>> It is resolved. It was because we exceeded the number of allowed
>> partitions for the kafka cluster (AWS MSK cluster). Have deleted
>> unused topics and partitions to resolve the issue.
>> 
>> Problem 1:-
>> ----------------
>> I increased the kafka partition and flink parallelism to 45 and the
>> throughput has improved from 20 minutes to 14 minutes (20K records).
>> Can you check the flink graph and let me know if there is anything
>> else that can be done here to improve the throughput further.
>> 
>> Thanks
>> 
>> On Wed, Sep 1, 2021 at 10:55 PM Mohammed Kamaal
>> <mo...@gmail.com> wrote:
>> >
>> > Hi Fabian,
>> >
>> > Problem 1:-
>> > ---------------------
>> > I have removed the print out sink's and ran the test again. This time
>> > the throughput is 17 minutes for 20K records (200 records every
>> > second). Earlier it was 20 minutes for 20K records. (parallelism 15
>> > and kafka partition of 15)
>> >
>> > Please find the attached application graph. Can you suggest what else
>> > is required further to improve the throughput.
>> >
>> > Problem 2:-
>> > ---------------------
>> > Also, I tried to increase the parallelism to 45 from 15 (also
>> > increasing the kafka partition to 45 from 15) to see if this helps in
>> > getting a better throughput.
>> >
>> > After increasing the partition, I am facing the Network issue with
>> > Kafka Cluster (AWS Managed Stream Kafka). I am not getting this issue
>> > with 15 partitions for the kafka topic. This could be an issue with
>> > the Kafka cluster?
>> >
>> > Kafka Cluster Configuration:-
>> > ---------------------------------------
>> > auto.create.topics.enable=true
>> > log.retention.hours=24
>> > default.replication.factor=3
>> > min.insync.replicas=2
>> > num.io.threads=45
>> > num.network.threads=60
>> > num.partitions=45
>> > num.replica.fetchers=2
>> > unclean.leader.election.enable=true
>> > replica.lag.time.max.ms=30000
>> > zookeeper.session.timeout.ms=18000
>> > log.retention.ms=172800000
>> > log.cleanup.policy=delete
>> > group.max.session.timeout.ms=1200000
>> >
>> > Exception:-
>> > ----------------
>> >  "locationInformation":
>> > "org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:500)",
>> >     "logger": "org.apache.flink.streaming.runtime.tasks.StreamTask",
>> >     "message": "Error during disposal of stream operator.",
>> >     "throwableInformation": [
>> >         "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
>> > Failed to send data to Kafka: Failed to send data to Kafka: The server
>> > disconnected
>> >
>> > "Caused by: org.apache.kafka.common.errors.NetworkException: The
>> > server disconnected before a response was received."
>> >
>> >
>> > Thanks
>> >
>> >
>> > On Wed, Aug 25, 2021 at 12:11 PM Fabian Paul <fa...@ververica.com> wrote:
>> > >
>> > > Hi Mohammed,
>> > >
>> > > 200records should definitely be doable. The first you can do is remove the print out Sink because they are increasing the load on your cluster due to the additional IO
>> > > operation and secondly preventing Flink from fusing operators.
>> > > I am interested to see the updated job graph after the removal of the print sinks.
>> > >
>> > > Best,
>> > > Fabian

Re: Flink Performance Issue

Posted by Arvid Heise <ar...@apache.org>.
Hi Mohammed,

something is definitely wrong in your setup. You can safely say that you
can process 1k records per second and core with Kafka and light processing,
so you shouldn't even need to go distributed in your case.

Do you perform any heavy computation? What is your flatMap doing? Are you
emitting lots of small records from one big record?

Can you please remove all rebalance and report back? Rebalance is
counter-productive if you don't exactly know that you need it.

On Thu, Sep 2, 2021 at 1:36 PM Mohammed Kamaal <mo...@gmail.com>
wrote:

> Hi Fabian,
>
> Just an update,
>
> Problem 2:-
> ----------------
> Caused by: org.apache.kafka.common.errors.NetworkException
> It is resolved. It was because we exceeded the number of allowed
> partitions for the kafka cluster (AWS MSK cluster). Have deleted
> unused topics and partitions to resolve the issue.
>
> Problem 1:-
> ----------------
> I increased the kafka partition and flink parallelism to 45 and the
> throughput has improved from 20 minutes to 14 minutes (20K records).
> Can you check the flink graph and let me know if there is anything
> else that can be done here to improve the throughput further.
>
> Thanks
>
> On Wed, Sep 1, 2021 at 10:55 PM Mohammed Kamaal
> <mo...@gmail.com> wrote:
> >
> > Hi Fabian,
> >
> > Problem 1:-
> > ---------------------
> > I have removed the print out sink's and ran the test again. This time
> > the throughput is 17 minutes for 20K records (200 records every
> > second). Earlier it was 20 minutes for 20K records. (parallelism 15
> > and kafka partition of 15)
> >
> > Please find the attached application graph. Can you suggest what else
> > is required further to improve the throughput.
> >
> > Problem 2:-
> > ---------------------
> > Also, I tried to increase the parallelism to 45 from 15 (also
> > increasing the kafka partition to 45 from 15) to see if this helps in
> > getting a better throughput.
> >
> > After increasing the partition, I am facing the Network issue with
> > Kafka Cluster (AWS Managed Stream Kafka). I am not getting this issue
> > with 15 partitions for the kafka topic. This could be an issue with
> > the Kafka cluster?
> >
> > Kafka Cluster Configuration:-
> > ---------------------------------------
> > auto.create.topics.enable=true
> > log.retention.hours=24
> > default.replication.factor=3
> > min.insync.replicas=2
> > num.io.threads=45
> > num.network.threads=60
> > num.partitions=45
> > num.replica.fetchers=2
> > unclean.leader.election.enable=true
> > replica.lag.time.max.ms=30000
> > zookeeper.session.timeout.ms=18000
> > log.retention.ms=172800000
> > log.cleanup.policy=delete
> > group.max.session.timeout.ms=1200000
> >
> > Exception:-
> > ----------------
> >  "locationInformation":
> >
> "org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:500)",
> >     "logger": "org.apache.flink.streaming.runtime.tasks.StreamTask",
> >     "message": "Error during disposal of stream operator.",
> >     "throwableInformation": [
> >         "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
> > Failed to send data to Kafka: Failed to send data to Kafka: The server
> > disconnected
> >
> > "Caused by: org.apache.kafka.common.errors.NetworkException: The
> > server disconnected before a response was received."
> >
> >
> > Thanks
> >
> >
> > On Wed, Aug 25, 2021 at 12:11 PM Fabian Paul <fa...@ververica.com>
> wrote:
> > >
> > > Hi Mohammed,
> > >
> > > 200records should definitely be doable. The first you can do is remove
> the print out Sink because they are increasing the load on your cluster due
> to the additional IO
> > > operation and secondly preventing Flink from fusing operators.
> > > I am interested to see the updated job graph after the removal of the
> print sinks.
> > >
> > > Best,
> > > Fabian
>

Re: Flink Performance Issue

Posted by Mohammed Kamaal <mo...@gmail.com>.
Hi Fabian,

Just an update,

Problem 2:-
----------------
Caused by: org.apache.kafka.common.errors.NetworkException
It is resolved. It was because we exceeded the number of allowed
partitions for the kafka cluster (AWS MSK cluster). Have deleted
unused topics and partitions to resolve the issue.

Problem 1:-
----------------
I increased the kafka partition and flink parallelism to 45 and the
throughput has improved from 20 minutes to 14 minutes (20K records).
Can you check the flink graph and let me know if there is anything
else that can be done here to improve the throughput further.

Thanks

On Wed, Sep 1, 2021 at 10:55 PM Mohammed Kamaal
<mo...@gmail.com> wrote:
>
> Hi Fabian,
>
> Problem 1:-
> ---------------------
> I have removed the print out sink's and ran the test again. This time
> the throughput is 17 minutes for 20K records (200 records every
> second). Earlier it was 20 minutes for 20K records. (parallelism 15
> and kafka partition of 15)
>
> Please find the attached application graph. Can you suggest what else
> is required further to improve the throughput.
>
> Problem 2:-
> ---------------------
> Also, I tried to increase the parallelism to 45 from 15 (also
> increasing the kafka partition to 45 from 15) to see if this helps in
> getting a better throughput.
>
> After increasing the partition, I am facing the Network issue with
> Kafka Cluster (AWS Managed Stream Kafka). I am not getting this issue
> with 15 partitions for the kafka topic. This could be an issue with
> the Kafka cluster?
>
> Kafka Cluster Configuration:-
> ---------------------------------------
> auto.create.topics.enable=true
> log.retention.hours=24
> default.replication.factor=3
> min.insync.replicas=2
> num.io.threads=45
> num.network.threads=60
> num.partitions=45
> num.replica.fetchers=2
> unclean.leader.election.enable=true
> replica.lag.time.max.ms=30000
> zookeeper.session.timeout.ms=18000
> log.retention.ms=172800000
> log.cleanup.policy=delete
> group.max.session.timeout.ms=1200000
>
> Exception:-
> ----------------
>  "locationInformation":
> "org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:500)",
>     "logger": "org.apache.flink.streaming.runtime.tasks.StreamTask",
>     "message": "Error during disposal of stream operator.",
>     "throwableInformation": [
>         "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
> Failed to send data to Kafka: Failed to send data to Kafka: The server
> disconnected
>
> "Caused by: org.apache.kafka.common.errors.NetworkException: The
> server disconnected before a response was received."
>
>
> Thanks
>
>
> On Wed, Aug 25, 2021 at 12:11 PM Fabian Paul <fa...@ververica.com> wrote:
> >
> > Hi Mohammed,
> >
> > 200records should definitely be doable. The first you can do is remove the print out Sink because they are increasing the load on your cluster due to the additional IO
> > operation and secondly preventing Flink from fusing operators.
> > I am interested to see the updated job graph after the removal of the print sinks.
> >
> > Best,
> > Fabian

Re: Flink Performance Issue

Posted by Mohammed Kamaal <mo...@gmail.com>.
Hi Fabian,

Problem 1:-
---------------------
I have removed the print out sink's and ran the test again. This time
the throughput is 17 minutes for 20K records (200 records every
second). Earlier it was 20 minutes for 20K records. (parallelism 15
and kafka partition of 15)

Please find the attached application graph. Can you suggest what else
is required further to improve the throughput.

Problem 2:-
---------------------
Also, I tried to increase the parallelism to 45 from 15 (also
increasing the kafka partition to 45 from 15) to see if this helps in
getting a better throughput.

After increasing the partition, I am facing the Network issue with
Kafka Cluster (AWS Managed Stream Kafka). I am not getting this issue
with 15 partitions for the kafka topic. This could be an issue with
the Kafka cluster?

Kafka Cluster Configuration:-
---------------------------------------
auto.create.topics.enable=true
log.retention.hours=24
default.replication.factor=3
min.insync.replicas=2
num.io.threads=45
num.network.threads=60
num.partitions=45
num.replica.fetchers=2
unclean.leader.election.enable=true
replica.lag.time.max.ms=30000
zookeeper.session.timeout.ms=18000
log.retention.ms=172800000
log.cleanup.policy=delete
group.max.session.timeout.ms=1200000

Exception:-
----------------
 "locationInformation":
"org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:500)",
    "logger": "org.apache.flink.streaming.runtime.tasks.StreamTask",
    "message": "Error during disposal of stream operator.",
    "throwableInformation": [
        "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
Failed to send data to Kafka: Failed to send data to Kafka: The server
disconnected

"Caused by: org.apache.kafka.common.errors.NetworkException: The
server disconnected before a response was received."


Thanks


On Wed, Aug 25, 2021 at 12:11 PM Fabian Paul <fa...@ververica.com> wrote:
>
> Hi Mohammed,
>
> 200records should definitely be doable. The first you can do is remove the print out Sink because they are increasing the load on your cluster due to the additional IO
> operation and secondly preventing Flink from fusing operators.
> I am interested to see the updated job graph after the removal of the print sinks.
>
> Best,
> Fabian

Re: Flink Performance Issue

Posted by Fabian Paul <fa...@ververica.com>.
Hi Mohammed,

200records should definitely be doable. The first you can do is remove the print out Sink because they are increasing the load on your cluster due to the additional IO 
operation and secondly preventing Flink from fusing operators.
I am interested to see the updated job graph after the removal of the print sinks.

Best,
Fabian

Re: Flink Performance Issue

Posted by Mohammed Kamaal <mo...@gmail.com>.
Hi Fabian,

Thanks for your response.

- What do you mean with 20k concurrent stream data, 20k records per
second? - It is 200 records per second which is 200K records in 100
seconds
- How many taskmanagers are you using and how are the slots
distributed? - There are 8 task managers each with 1 slot (total of 8
slots)
- Can you check the Flink WebUI if some operators are idle and maybe
share the image of the job graph? - please find the graph image and
sub-task image below
- How did you notice the lag of 2k between the operators? - I could
notice the lag when I expand the sub-task in the WebUI and could see
the number of records processed in the first operator at one point is
4K and the next consecutive operators are still in 2K processing.

1)Is there any other alternate option to 'keyBy' to avoid costly operation?
2)As I mentioned, the checkpoint is enabled and the interval is 1
minute. Do you suggest that I increase the interval time? Would that
help here?

Thanks





> On Tue, Aug 24, 2021 at 1:08 PM Fabian Paul <fa...@ververica.com> wrote:
>
> Hi Mohammed,
>
> Without diving too much into your business logic a thing which catches my eye is the partitiong you are using. In general all
> calls to`keyBy`or `rebalance` are very expensive because all the data is shuffled across down- stream tasks. Flink tries to
> fuse operators with the same keyGroups together that there is no communication overhead between them but this is not
> possible if a shuffle is between them
> One example would be your cgmStream which first is distributed by a specified key and rebalance right after it.
> When applying `keyBy` operation it is important to understand how the key distribution in your input data looks like. It may
> happen that specific keys occur very very and some others appear with a less likelihood this also can cause a skew in your
> pipeline which cannot be resolved with a higher parallelism (some tasks are overloaded, some are idle).
>
> I also have a couple of followup questions to better understand your setup
>
> - What do you mean with 20k concurrent stream data, 20k records per second?
> - How many taskmanagers are you using and how are the slots distributed?
> - Can you check the Flink WebUI if some operators are idle and maybe share the image of the job graph?
> - How did you notice the lag of 2k between the operators?
>
> Best,
> Fabian
>
>

Re: Flink Performance Issue

Posted by Fabian Paul <fa...@ververica.com>.
Hi Mohammed,

Without diving too much into your business logic a thing which catches my eye is the partitiong you are using. In general all
calls to`keyBy`or `rebalance` are very expensive because all the data is shuffled across down- stream tasks. Flink tries to
fuse operators with the same keyGroups together that there is no communication overhead between them but this is not 
possible if a shuffle is between them
One example would be your cgmStream which first is distributed by a specified key and rebalance right after it. 
When applying `keyBy` operation it is important to understand how the key distribution in your input data looks like. It may
happen that specific keys occur very very and some others appear with a less likelihood this also can cause a skew in your
pipeline which cannot be resolved with a higher parallelism (some tasks are overloaded, some are idle).

I also have a couple of followup questions to better understand your setup

- What do you mean with 20k concurrent stream data, 20k records per second?
- How many taskmanagers are you using and how are the slots distributed?
- Can you check the Flink WebUI if some operators are idle and maybe share the image of the job graph?
- How did you notice the lag of 2k between the operators?

Best,
Fabian