You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ning Shi <ni...@gmail.com> on 2018/08/27 02:30:42 UTC

Low Performance in High Cardinality Big Window Application

The application consumes from a single Kafka topic, deserializes the
JSON payload into POJOs and use a big keyed window (30+ days) for
deduplication, then emits the result for every single event to four
other keyed windows for aggregation. It looks roughly like the
following.

Source->KeyBy(A,B,C)
           |
           |                    -->KeyBy(A,B,C)->Hourly Window(sum)
           v                    |->KeyBy(A,B,C)->Daily Window(sum)
Big Window(sum, emit per event) -|
                                |->KeyBy(D,E)->Hourly Window(sum)
                                -->KeyBy(D,E)->Daily Window(sum)

The cardinality for the window keyed on (A,B,C) is high, could be in the
millions. The values (A,B,C) are all strings.

I'm doing performance testing by letting the application consuming the
past 7 days data from Kafka. However, the performance is not good and
I'm having some trouble interpreting the results. All tests were done on
AWS using i3.xlarge with 2 slots per TM. This was tested with one,
three, and six TMs. Parallelism was set to the same as the total number
of slots available, e.g. 6 for 3 nodes with 2 slots per TM.

- The application would always start at consuming ~500 messages/s from
  Kafka for about 20 - 30 minutes, then jump to ~5,000 messages/s. I
  noticed that the disk I/O would reduce noticeable when the performance
  jumped.

- Regardless of the number of TMs used, it always peaked at ~5,000
  messages/s and had the same behavior as described above.

- In the Flink UI, it always shows that the Source was back pressured by
  the Big window when the performance was at ~500 messages/s, and no
  back pressure at all once the performance reaches ~5,000 messages/s.

- I took some Flight Recorder recordings and it showed that the time
  trigger Big window thread was always doing
  SystemProcessingTimeService$TriggerTask.run(). Since I'm only
  triggering the Big window by count of events, why would this be
  running?

- Flight Recorder also showed that the Big window thread was either
  doing RocksDB writes or gets most of the time when the performance was
  low. I understand that it keeps the states in RocksDB, but I wasn't
  expecting it to tank the performance like this.

- Flight Recorder showed that the hottest methods were all about Kryo
  serialization.

- GC was ok, nothing longer than 20ms and there weren't a lot of them.

My questions are

- Why is the performance so bad and why didn't it scale as I increase
  the number of TMs.

- Why would the performance jump suddenly after 20 minutes or so?

- I know the JSON and POJO serialization is not great. Could it be this
  bad?

Any insights or guidance on how I can diagnose the issue further will be
greatly appreciated.

Thanks,

Ning

Re: Low Performance in High Cardinality Big Window Application

Posted by Ning Shi <ni...@gmail.com>.
Hi Konstantin,

> could you replace the Kafka Source by a custom SourceFunction-implementation, which just produces the new events in a loop as fast as possible. This way we can rule out that the ingestion is responsible for the performance jump or the limit at 5000 events/s and can benchmark the Flink job separately.

We built a custom source function and figured out the reason for the
sudden performance jump. It was caused by the wrong watermarks in the
original Kafka stream. Certain events created watermarks in the
future. As soon as that happened, all subsequent events were dropped.
Hence the throughput increase.

> Kryo is much less efficient than Flinks POJO serializer. In the logs you should see INFO logs about the classes for which Flink falls back to Kryo. Try to replace those by Flink POJO, i.e. default constructor and public getters and setters. As Flink also needs to serialize/deserialize each state object for reading and writing this also applies to your state classes, not only to your events. The lines you are looking for are "INFO  org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for class com.company.YourClass so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.". You can disable the Kryo fallback temporarily as descried in [1]. I would suspect that serialization is a big factor right now, in particular if you see Kryo methods taking a lot of time.

This was very helpful. We found a couple of key objects being
serialized using Kryo. After fixing them and making sure that they
were properly serialized as POJOs, the performance almost doubled from
500 events/s.

Thank you a lot for the advices,

Ning

Re: Low Performance in High Cardinality Big Window Application

Posted by Konstantin Knauf <ko...@data-artisans.com>.
Hi Ning,

could you replace the Kafka Source by a custom
SourceFunction-implementation, which just produces the new events in a loop
as fast as possible. This way we can rule out that the ingestion is
responsible for the performance jump or the limit at 5000 events/s and can
benchmark the Flink job separately.

Kryo is much less efficient than Flinks POJO serializer. In the logs you
should see INFO logs about the classes for which Flink falls back to Kryo.
Try to replace those by Flink POJO, i.e. default constructor and public
getters and setters. As Flink also needs to serialize/deserialize each
state object for reading and writing this also applies to your state
classes, not only to your events. The lines you are looking for are "INFO
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
for class com.company.YourClass so it cannot be used as a POJO type and
must be processed as GenericType. Please read the Flink documentation on
"Data Types & Serialization" for details of the effect on performance.".
You can disable the Kryo fallback temporarily as descried in [1]. I would
suspect that serialization is a big factor right now, in particular if you
see Kryo methods taking a lot of time.

Best,

Konstantin

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/types_serialization.html#disabling-kryo-fallback

Re: Low Performance in High Cardinality Big Window Application

Posted by Ning Shi <ni...@gmail.com>.
> If you have a window larger than hours then you need to rethink your architecture - this is not streaming anymore. Only because you receive events in a streamed fashion you don’t need to do all the processing in a streamed fashion.

Thanks for the thoughts, I’ll keep that in mind. However, in the test, it was not storing more than two days worth of data yet. I’m very much interested in understanding the root cause of the low performance before moving on to do major restructuring.

> Can you store the events in a file or a database and then do after 30 days batch processing on them?

The 30 day window is just used for deduplication, but it triggers for every event and sends the result out to downstream so that we can still get real-time analytics on the events.

> Another aspect could be also to investigate why your source sends duplicated entries.

They are not 100% duplicate events syntactically. The events are only duplicates from a logical sense. For example, the same person doing the same action multiple times at different time of day.

Ning

Re: Low Performance in High Cardinality Big Window Application

Posted by Jörn Franke <jo...@gmail.com>.
If you have a window larger than hours then you need to rethink your architecture - this is not streaming anymore. Only because you receive events in a streamed fashion you don’t need to do all the processing in a streamed fashion.
Can you store the events in a file or a database and then do after 30 days batch processing on them?

Another aspect could be also to investigate why your source sends duplicated entries.

> On 27. Aug 2018, at 04:30, Ning Shi <ni...@gmail.com> wrote:
> 
> The application consumes from a single Kafka topic, deserializes the
> JSON payload into POJOs and use a big keyed window (30+ days) for
> deduplication, then emits the result for every single event to four
> other keyed windows for aggregation. It looks roughly like the
> following.
> 
> Source->KeyBy(A,B,C)
>           |
>           |                    -->KeyBy(A,B,C)->Hourly Window(sum)
>           v                    |->KeyBy(A,B,C)->Daily Window(sum)
> Big Window(sum, emit per event) -|
>                                |->KeyBy(D,E)->Hourly Window(sum)
>                                -->KeyBy(D,E)->Daily Window(sum)
> 
> The cardinality for the window keyed on (A,B,C) is high, could be in the
> millions. The values (A,B,C) are all strings.
> 
> I'm doing performance testing by letting the application consuming the
> past 7 days data from Kafka. However, the performance is not good and
> I'm having some trouble interpreting the results. All tests were done on
> AWS using i3.xlarge with 2 slots per TM. This was tested with one,
> three, and six TMs. Parallelism was set to the same as the total number
> of slots available, e.g. 6 for 3 nodes with 2 slots per TM.
> 
> - The application would always start at consuming ~500 messages/s from
>  Kafka for about 20 - 30 minutes, then jump to ~5,000 messages/s. I
>  noticed that the disk I/O would reduce noticeable when the performance
>  jumped.
> 
> - Regardless of the number of TMs used, it always peaked at ~5,000
>  messages/s and had the same behavior as described above.
> 
> - In the Flink UI, it always shows that the Source was back pressured by
>  the Big window when the performance was at ~500 messages/s, and no
>  back pressure at all once the performance reaches ~5,000 messages/s.
> 
> - I took some Flight Recorder recordings and it showed that the time
>  trigger Big window thread was always doing
>  SystemProcessingTimeService$TriggerTask.run(). Since I'm only
>  triggering the Big window by count of events, why would this be
>  running?
> 
> - Flight Recorder also showed that the Big window thread was either
>  doing RocksDB writes or gets most of the time when the performance was
>  low. I understand that it keeps the states in RocksDB, but I wasn't
>  expecting it to tank the performance like this.
> 
> - Flight Recorder showed that the hottest methods were all about Kryo
>  serialization.
> 
> - GC was ok, nothing longer than 20ms and there weren't a lot of them.
> 
> My questions are
> 
> - Why is the performance so bad and why didn't it scale as I increase
>  the number of TMs.
> 
> - Why would the performance jump suddenly after 20 minutes or so?
> 
> - I know the JSON and POJO serialization is not great. Could it be this
>  bad?
> 
> Any insights or guidance on how I can diagnose the issue further will be
> greatly appreciated.
> 
> Thanks,
> 
> Ning