You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Theo Diefenthal <th...@scoop-software.de> on 2020/02/17 12:13:56 UTC
Parallelize Kafka Deserialization of a single partition?
Hi,
As for most pipelines, our flink pipeline start with parsing source kafka events into POJOs. We perform this step within a KafkaDeserizationSchema so that we properly extract the event itme timestamp for the downstream Timestamp-Assigner.
Now it turned out that parsing is currently the most CPU intensive task in our pipeline and thus CPU bounds the number of elements we can ingest per second. Further splitting up the partitions will be hard as we need to maintain the exact order of events per partition and would also required quite some architectural changes for producers and the flink job.
Now I had the idea to put the parsing task into ordered Async-IO. But AsyncIO can only be plugged in into an existing Stream, not into the deserialization schema, as far as I see. So the best idea I currently have is to keep parsing in the DeserializationSchema as minimal as possible to extract the Event timestamp and do the full parsing downstream in Async IO. This however, seems to be a bit tedious, especially as we have to deal with multiple input formats and would need to develop two parsers for the heavy load once: a timestamp only and a full parser.
Do you know if it is somehow possible to parallelize / async IO the parsing within the KafkaDeserializationSchema? I don't have state access in there and I don't have a "collector" object in there so that one element as input needs to produce exactly one output element.
Another question: My parsing produces Java POJO objects via "new", which are sent downstream (reusePOJO setting set) and finally will be garbage collected once they reached the sink. Is there some mechanism in Flink so that I could reuse "old" sinked POJOs in the source? All tasks are chained so that theoretically, that could be possible?
Best regards
Theo
Re: Parallelize Kafka Deserialization of a single partition?
Posted by Till Rohrmann <tr...@apache.org>.
I have to correct myself. DataStream respects the
ExecutionConfig.enableObjectReuse which happens in the form of creating
different Outputs in the OperatorChain. This is also in line with the
different behaviour you are observing.
Concerning your initial question Theo, you could do the following if you
are sure that none of your operators keeps a reference to the created Pojo,
the Pojo is not directly stored in state and there is no AsyncI/O operator
working on the Pojo: In the operator where you do the parsing you can
simply keep a reference to a single instance of the POJO and reuse it
whenever you process a new event. That way you should avoid the creation of
new instances. However, keep in mind that this is a potentially dangerous
operation if any of the before-mentioned conditions is violated.
Cheers,
Till
On Wed, Feb 19, 2020 at 12:29 PM Timo Walther <tw...@apache.org> wrote:
> Hi Theo,
>
> there are lot of performance improvements that Flink could do but they
> would further complicate the interfaces and API. They would require deep
> knowledge of users about the runtime when it is safe to reuse object and
> when not.
>
> The Table/SQL API of Flink uses a lot of these optimization under the
> hood and works on binary data for reducing garbage collection.
>
> For the DataStream API, the community decided for safety/correctness
> before performance in this case. But disabling the object reuse and
> further low level optimization should give a good result if needed.
>
> Regards,
> Timo
>
>
> On 19.02.20 10:43, Theo Diefenthal wrote:
> > I have the same experience as Eleanore,
> >
> > When enabling object reuse, I saw a significant performance improvement
> > and in my profiling session. I saw that a lot of
> > serialization/deserialization was not performed any more.
> >
> > That’s why my question should originally aim a bit further: It’s good
> > that Flink reuses objects, but I still need to create a new instance of
> > my objects per event when parsed, which is ultimately dropped at some
> > processing step in the flink pipeline later on (map, shuffle or sink).
> > Wouldn’t it be possible that the “deserialize” method can have an
> > optional “oldPOJO” input where Flink provides me an unused old instance
> > of my POJO if it has one left? And if null, I instantiate a new instance
> > in my code? With billions of small events ingested per day, I can
> > imagine this to be another small performance improvement especially in
> > terms of garbage collection…
> >
> > Best regads
> >
> > Theo
> >
> > *From:*Till Rohrmann <tr...@apache.org>
> > *Sent:* Mittwoch, 19. Februar 2020 07:34
> > *To:* Jin Yi <el...@gmail.com>
> > *Cc:* user <us...@flink.apache.org>
> > *Subject:* Re: Parallelize Kafka Deserialization of a single partition?
> >
> > Then my statement must be wrong. Let me double check this. Yesterday
> > when checking the usage of the objectReuse field, I could only see it in
> > the batch operators. I'll get back to you.
> >
> > Cheers,
> >
> > Till
> >
> > On Wed, Feb 19, 2020, 07:05 Jin Yi <eleanore.jin@gmail.com
> > <ma...@gmail.com>> wrote:
> >
> > Hi Till,
> >
> > I just read your comment:
> >
> > Currently, enabling object reuse via
> > ExecutionConfig.enableObjectReuse() only affects the DataSet API.
> > DataStream programs will always do defensive copies. There is a FLIP
> > to improve this behaviour [1].
> >
> > I have an application that is written in apache beam, but the runner
> > is flink, in the configuration of the pipeline, it is in streaming
> > mode, and I see performance difference between enable/disable
> > ObjectReuse, also when running in debugging mode, I noticed that
> > with objectReuse set to true, there is no
> > serialization/deserialization happening between operators, however,
> > when set to false, in between each operator, the serialization and
> > deserialization is happening. So do you have any idea why this is
> > happening?
> >
> > MyOptions options = PipelineOptionsFactory./as/(MyOptions.*class*);
> >
> > options.setStreaming(*true*);
> >
> > options.setObjectReuse(*true*);
> >
> > Thanks a lot!
> >
> > Eleanore
> >
> > On Tue, Feb 18, 2020 at 6:05 AM Till Rohrmann <trohrmann@apache.org
> > <ma...@apache.org>> wrote:
> >
> > Hi Theo,
> >
> > the KafkaDeserializationSchema does not allow to return
> > asynchronous results. Hence, Flink will always wait until
> > KafkaDeserializationSchema.deserialize returns the parsed value.
> > Consequently, the only way I can think of to offload the complex
> > parsing logic would be to do it in a downstream operator where
> > you could use AsyncI/O to run the parsing logic in a thread
> > pool, for example.
> >
> > Alternatively, you could think about a simple program which
> > transforms your input events into another format where it is
> > easier to extract the timestamp from. This comes, however, at
> > the cost of another Kafka topic.
> >
> > Currently, enabling object reuse via
> > ExecutionConfig.enableObjectReuse() only affects the DataSet
> > API. DataStream programs will always do defensive copies. There
> > is a FLIP to improve this behaviour [1].
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982
> >
> > Cheers,
> >
> > Till
> >
> > On Mon, Feb 17, 2020 at 1:14 PM Theo Diefenthal
> > <theo.diefenthal@scoop-software.de
> > <ma...@scoop-software.de>> wrote:
> >
> > Hi,
> >
> > As for most pipelines, our flink pipeline start with parsing
> > source kafka events into POJOs. We perform this step within
> > a KafkaDeserizationSchema so that we properly extract the
> > event itme timestamp for the downstream Timestamp-Assigner.
> >
> > Now it turned out that parsing is currently the most CPU
> > intensive task in our pipeline and thus CPU bounds the
> > number of elements we can ingest per second. Further
> > splitting up the partitions will be hard as we need to
> > maintain the exact order of events per partition and would
> > also required quite some architectural changes for producers
> > and the flink job.
> >
> > Now I had the idea to put the parsing task into ordered
> > Async-IO. But AsyncIO can only be plugged in into an
> > existing Stream, not into the deserialization schema, as far
> > as I see. So the best idea I currently have is to keep
> > parsing in the DeserializationSchema as minimal as possible
> > to extract the Event timestamp and do the full parsing
> > downstream in Async IO. This however, seems to be a bit
> > tedious, especially as we have to deal with multiple input
> > formats and would need to develop two parsers for the heavy
> > load once: a timestamp only and a full parser.
> >
> > Do you know if it is somehow possible to parallelize / async
> > IO the parsing within the KafkaDeserializationSchema? I
> > don't have state access in there and I don't have a
> > "collector" object in there so that one element as input
> > needs to produce exactly one output element.
> >
> > Another question: My parsing produces Java POJO objects via
> > "new", which are sent downstream (reusePOJO setting set) and
> > finally will be garbage collected once they reached the
> > sink. Is there some mechanism in Flink so that I could reuse
> > "old" sinked POJOs in the source? All tasks are chained so
> > that theoretically, that could be possible?
> >
> > Best regards
> >
> > Theo
> >
>
>
Re: Parallelize Kafka Deserialization of a single partition?
Posted by Timo Walther <tw...@apache.org>.
Hi Theo,
there are lot of performance improvements that Flink could do but they
would further complicate the interfaces and API. They would require deep
knowledge of users about the runtime when it is safe to reuse object and
when not.
The Table/SQL API of Flink uses a lot of these optimization under the
hood and works on binary data for reducing garbage collection.
For the DataStream API, the community decided for safety/correctness
before performance in this case. But disabling the object reuse and
further low level optimization should give a good result if needed.
Regards,
Timo
On 19.02.20 10:43, Theo Diefenthal wrote:
> I have the same experience as Eleanore,
>
> When enabling object reuse, I saw a significant performance improvement
> and in my profiling session. I saw that a lot of
> serialization/deserialization was not performed any more.
>
> That’s why my question should originally aim a bit further: It’s good
> that Flink reuses objects, but I still need to create a new instance of
> my objects per event when parsed, which is ultimately dropped at some
> processing step in the flink pipeline later on (map, shuffle or sink).
> Wouldn’t it be possible that the “deserialize” method can have an
> optional “oldPOJO” input where Flink provides me an unused old instance
> of my POJO if it has one left? And if null, I instantiate a new instance
> in my code? With billions of small events ingested per day, I can
> imagine this to be another small performance improvement especially in
> terms of garbage collection…
>
> Best regads
>
> Theo
>
> *From:*Till Rohrmann <tr...@apache.org>
> *Sent:* Mittwoch, 19. Februar 2020 07:34
> *To:* Jin Yi <el...@gmail.com>
> *Cc:* user <us...@flink.apache.org>
> *Subject:* Re: Parallelize Kafka Deserialization of a single partition?
>
> Then my statement must be wrong. Let me double check this. Yesterday
> when checking the usage of the objectReuse field, I could only see it in
> the batch operators. I'll get back to you.
>
> Cheers,
>
> Till
>
> On Wed, Feb 19, 2020, 07:05 Jin Yi <eleanore.jin@gmail.com
> <ma...@gmail.com>> wrote:
>
> Hi Till,
>
> I just read your comment:
>
> Currently, enabling object reuse via
> ExecutionConfig.enableObjectReuse() only affects the DataSet API.
> DataStream programs will always do defensive copies. There is a FLIP
> to improve this behaviour [1].
>
> I have an application that is written in apache beam, but the runner
> is flink, in the configuration of the pipeline, it is in streaming
> mode, and I see performance difference between enable/disable
> ObjectReuse, also when running in debugging mode, I noticed that
> with objectReuse set to true, there is no
> serialization/deserialization happening between operators, however,
> when set to false, in between each operator, the serialization and
> deserialization is happening. So do you have any idea why this is
> happening?
>
> MyOptions options = PipelineOptionsFactory./as/(MyOptions.*class*);
>
> options.setStreaming(*true*);
>
> options.setObjectReuse(*true*);
>
> Thanks a lot!
>
> Eleanore
>
> On Tue, Feb 18, 2020 at 6:05 AM Till Rohrmann <trohrmann@apache.org
> <ma...@apache.org>> wrote:
>
> Hi Theo,
>
> the KafkaDeserializationSchema does not allow to return
> asynchronous results. Hence, Flink will always wait until
> KafkaDeserializationSchema.deserialize returns the parsed value.
> Consequently, the only way I can think of to offload the complex
> parsing logic would be to do it in a downstream operator where
> you could use AsyncI/O to run the parsing logic in a thread
> pool, for example.
>
> Alternatively, you could think about a simple program which
> transforms your input events into another format where it is
> easier to extract the timestamp from. This comes, however, at
> the cost of another Kafka topic.
>
> Currently, enabling object reuse via
> ExecutionConfig.enableObjectReuse() only affects the DataSet
> API. DataStream programs will always do defensive copies. There
> is a FLIP to improve this behaviour [1].
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982
>
> Cheers,
>
> Till
>
> On Mon, Feb 17, 2020 at 1:14 PM Theo Diefenthal
> <theo.diefenthal@scoop-software.de
> <ma...@scoop-software.de>> wrote:
>
> Hi,
>
> As for most pipelines, our flink pipeline start with parsing
> source kafka events into POJOs. We perform this step within
> a KafkaDeserizationSchema so that we properly extract the
> event itme timestamp for the downstream Timestamp-Assigner.
>
> Now it turned out that parsing is currently the most CPU
> intensive task in our pipeline and thus CPU bounds the
> number of elements we can ingest per second. Further
> splitting up the partitions will be hard as we need to
> maintain the exact order of events per partition and would
> also required quite some architectural changes for producers
> and the flink job.
>
> Now I had the idea to put the parsing task into ordered
> Async-IO. But AsyncIO can only be plugged in into an
> existing Stream, not into the deserialization schema, as far
> as I see. So the best idea I currently have is to keep
> parsing in the DeserializationSchema as minimal as possible
> to extract the Event timestamp and do the full parsing
> downstream in Async IO. This however, seems to be a bit
> tedious, especially as we have to deal with multiple input
> formats and would need to develop two parsers for the heavy
> load once: a timestamp only and a full parser.
>
> Do you know if it is somehow possible to parallelize / async
> IO the parsing within the KafkaDeserializationSchema? I
> don't have state access in there and I don't have a
> "collector" object in there so that one element as input
> needs to produce exactly one output element.
>
> Another question: My parsing produces Java POJO objects via
> "new", which are sent downstream (reusePOJO setting set) and
> finally will be garbage collected once they reached the
> sink. Is there some mechanism in Flink so that I could reuse
> "old" sinked POJOs in the source? All tasks are chained so
> that theoretically, that could be possible?
>
> Best regards
>
> Theo
>
RE: Parallelize Kafka Deserialization of a single partition?
Posted by Theo Diefenthal <th...@scoop-software.de>.
I have the same experience as Eleanore,
When enabling object reuse, I saw a significant performance improvement and
in my profiling session. I saw that a lot of serialization/deserialization
was not performed any more.
That’s why my question should originally aim a bit further: It’s good that
Flink reuses objects, but I still need to create a new instance of my
objects per event when parsed, which is ultimately dropped at some
processing step in the flink pipeline later on (map, shuffle or sink).
Wouldn’t it be possible that the “deserialize” method can have an optional
“oldPOJO” input where Flink provides me an unused old instance of my POJO if
it has one left? And if null, I instantiate a new instance in my code? With
billions of small events ingested per day, I can imagine this to be another
small performance improvement especially in terms of garbage collection…
Best regads
Theo
From: Till Rohrmann <tr...@apache.org>
Sent: Mittwoch, 19. Februar 2020 07:34
To: Jin Yi <el...@gmail.com>
Cc: user <us...@flink.apache.org>
Subject: Re: Parallelize Kafka Deserialization of a single partition?
Then my statement must be wrong. Let me double check this. Yesterday when
checking the usage of the objectReuse field, I could only see it in the
batch operators. I'll get back to you.
Cheers,
Till
On Wed, Feb 19, 2020, 07:05 Jin Yi <eleanore.jin@gmail.com
<ma...@gmail.com> > wrote:
Hi Till,
I just read your comment:
Currently, enabling object reuse via ExecutionConfig.enableObjectReuse()
only affects the DataSet API. DataStream programs will always do defensive
copies. There is a FLIP to improve this behaviour [1].
I have an application that is written in apache beam, but the runner is
flink, in the configuration of the pipeline, it is in streaming mode, and I
see performance difference between enable/disable ObjectReuse, also when
running in debugging mode, I noticed that with objectReuse set to true,
there is no serialization/deserialization happening between operators,
however, when set to false, in between each operator, the serialization and
deserialization is happening. So do you have any idea why this is happening?
MyOptions options = PipelineOptionsFactory.as(MyOptions.class);
options.setStreaming(true);
options.setObjectReuse(true);
Thanks a lot!
Eleanore
On Tue, Feb 18, 2020 at 6:05 AM Till Rohrmann <trohrmann@apache.org
<ma...@apache.org> > wrote:
Hi Theo,
the KafkaDeserializationSchema does not allow to return asynchronous
results. Hence, Flink will always wait until
KafkaDeserializationSchema.deserialize returns the parsed value.
Consequently, the only way I can think of to offload the complex parsing
logic would be to do it in a downstream operator where you could use
AsyncI/O to run the parsing logic in a thread pool, for example.
Alternatively, you could think about a simple program which transforms your
input events into another format where it is easier to extract the timestamp
from. This comes, however, at the cost of another Kafka topic.
Currently, enabling object reuse via ExecutionConfig.enableObjectReuse()
only affects the DataSet API. DataStream programs will always do defensive
copies. There is a FLIP to improve this behaviour [1].
[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982
Cheers,
Till
On Mon, Feb 17, 2020 at 1:14 PM Theo Diefenthal
<theo.diefenthal@scoop-software.de
<ma...@scoop-software.de> > wrote:
Hi,
As for most pipelines, our flink pipeline start with parsing source kafka
events into POJOs. We perform this step within a KafkaDeserizationSchema so
that we properly extract the event itme timestamp for the downstream
Timestamp-Assigner.
Now it turned out that parsing is currently the most CPU intensive task in
our pipeline and thus CPU bounds the number of elements we can ingest per
second. Further splitting up the partitions will be hard as we need to
maintain the exact order of events per partition and would also required
quite some architectural changes for producers and the flink job.
Now I had the idea to put the parsing task into ordered Async-IO. But
AsyncIO can only be plugged in into an existing Stream, not into the
deserialization schema, as far as I see. So the best idea I currently have
is to keep parsing in the DeserializationSchema as minimal as possible to
extract the Event timestamp and do the full parsing downstream in Async IO.
This however, seems to be a bit tedious, especially as we have to deal with
multiple input formats and would need to develop two parsers for the heavy
load once: a timestamp only and a full parser.
Do you know if it is somehow possible to parallelize / async IO the parsing
within the KafkaDeserializationSchema? I don't have state access in there
and I don't have a "collector" object in there so that one element as input
needs to produce exactly one output element.
Another question: My parsing produces Java POJO objects via "new", which are
sent downstream (reusePOJO setting set) and finally will be garbage
collected once they reached the sink. Is there some mechanism in Flink so
that I could reuse "old" sinked POJOs in the source? All tasks are chained
so that theoretically, that could be possible?
Best regards
Theo
Re: Parallelize Kafka Deserialization of a single partition?
Posted by Till Rohrmann <tr...@apache.org>.
Then my statement must be wrong. Let me double check this. Yesterday when
checking the usage of the objectReuse field, I could only see it in the
batch operators. I'll get back to you.
Cheers,
Till
On Wed, Feb 19, 2020, 07:05 Jin Yi <el...@gmail.com> wrote:
> Hi Till,
> I just read your comment:
> Currently, enabling object reuse via ExecutionConfig.enableObjectReuse()
> only affects the DataSet API. DataStream programs will always do defensive
> copies. There is a FLIP to improve this behaviour [1].
>
> I have an application that is written in apache beam, but the runner is
> flink, in the configuration of the pipeline, it is in streaming mode, and I
> see performance difference between enable/disable ObjectReuse, also when
> running in debugging mode, I noticed that with objectReuse set to true,
> there is no serialization/deserialization happening between operators,
> however, when set to false, in between each operator, the serialization and
> deserialization is happening. So do you have any idea why this is happening?
>
> MyOptions options = PipelineOptionsFactory.as(MyOptions.class);
>
> options.setStreaming(true);
>
> options.setObjectReuse(true);
>
>
> Thanks a lot!
>
> Eleanore
>
>
> On Tue, Feb 18, 2020 at 6:05 AM Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Theo,
>>
>> the KafkaDeserializationSchema does not allow to return asynchronous
>> results. Hence, Flink will always wait until
>> KafkaDeserializationSchema.deserialize returns the parsed value.
>> Consequently, the only way I can think of to offload the complex parsing
>> logic would be to do it in a downstream operator where you could use
>> AsyncI/O to run the parsing logic in a thread pool, for example.
>>
>> Alternatively, you could think about a simple program which transforms
>> your input events into another format where it is easier to extract the
>> timestamp from. This comes, however, at the cost of another Kafka topic.
>>
>> Currently, enabling object reuse via ExecutionConfig.enableObjectReuse()
>> only affects the DataSet API. DataStream programs will always do defensive
>> copies. There is a FLIP to improve this behaviour [1].
>>
>> [1]
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982
>>
>> Cheers,
>> Till
>>
>> On Mon, Feb 17, 2020 at 1:14 PM Theo Diefenthal <
>> theo.diefenthal@scoop-software.de> wrote:
>>
>>> Hi,
>>>
>>> As for most pipelines, our flink pipeline start with parsing source
>>> kafka events into POJOs. We perform this step within a
>>> KafkaDeserizationSchema so that we properly extract the event itme
>>> timestamp for the downstream Timestamp-Assigner.
>>>
>>> Now it turned out that parsing is currently the most CPU intensive task
>>> in our pipeline and thus CPU bounds the number of elements we can ingest
>>> per second. Further splitting up the partitions will be hard as we need to
>>> maintain the exact order of events per partition and would also required
>>> quite some architectural changes for producers and the flink job.
>>>
>>> Now I had the idea to put the parsing task into ordered Async-IO. But
>>> AsyncIO can only be plugged in into an existing Stream, not into the
>>> deserialization schema, as far as I see. So the best idea I currently have
>>> is to keep parsing in the DeserializationSchema as minimal as possible to
>>> extract the Event timestamp and do the full parsing downstream in Async IO.
>>> This however, seems to be a bit tedious, especially as we have to deal with
>>> multiple input formats and would need to develop two parsers for the heavy
>>> load once: a timestamp only and a full parser.
>>>
>>> Do you know if it is somehow possible to parallelize / async IO the
>>> parsing within the KafkaDeserializationSchema? I don't have state access in
>>> there and I don't have a "collector" object in there so that one element as
>>> input needs to produce exactly one output element.
>>>
>>> Another question: My parsing produces Java POJO objects via "new", which
>>> are sent downstream (reusePOJO setting set) and finally will be garbage
>>> collected once they reached the sink. Is there some mechanism in Flink so
>>> that I could reuse "old" sinked POJOs in the source? All tasks are chained
>>> so that theoretically, that could be possible?
>>>
>>> Best regards
>>> Theo
>>>
>>
Re: Parallelize Kafka Deserialization of a single partition?
Posted by Jin Yi <el...@gmail.com>.
Hi Till,
I just read your comment:
Currently, enabling object reuse via ExecutionConfig.enableObjectReuse()
only affects the DataSet API. DataStream programs will always do defensive
copies. There is a FLIP to improve this behaviour [1].
I have an application that is written in apache beam, but the runner is
flink, in the configuration of the pipeline, it is in streaming mode, and I
see performance difference between enable/disable ObjectReuse, also when
running in debugging mode, I noticed that with objectReuse set to true,
there is no serialization/deserialization happening between operators,
however, when set to false, in between each operator, the serialization and
deserialization is happening. So do you have any idea why this is happening?
MyOptions options = PipelineOptionsFactory.as(MyOptions.class);
options.setStreaming(true);
options.setObjectReuse(true);
Thanks a lot!
Eleanore
On Tue, Feb 18, 2020 at 6:05 AM Till Rohrmann <tr...@apache.org> wrote:
> Hi Theo,
>
> the KafkaDeserializationSchema does not allow to return asynchronous
> results. Hence, Flink will always wait until
> KafkaDeserializationSchema.deserialize returns the parsed value.
> Consequently, the only way I can think of to offload the complex parsing
> logic would be to do it in a downstream operator where you could use
> AsyncI/O to run the parsing logic in a thread pool, for example.
>
> Alternatively, you could think about a simple program which transforms
> your input events into another format where it is easier to extract the
> timestamp from. This comes, however, at the cost of another Kafka topic.
>
> Currently, enabling object reuse via ExecutionConfig.enableObjectReuse()
> only affects the DataSet API. DataStream programs will always do defensive
> copies. There is a FLIP to improve this behaviour [1].
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982
>
> Cheers,
> Till
>
> On Mon, Feb 17, 2020 at 1:14 PM Theo Diefenthal <
> theo.diefenthal@scoop-software.de> wrote:
>
>> Hi,
>>
>> As for most pipelines, our flink pipeline start with parsing source kafka
>> events into POJOs. We perform this step within a KafkaDeserizationSchema so
>> that we properly extract the event itme timestamp for the downstream
>> Timestamp-Assigner.
>>
>> Now it turned out that parsing is currently the most CPU intensive task
>> in our pipeline and thus CPU bounds the number of elements we can ingest
>> per second. Further splitting up the partitions will be hard as we need to
>> maintain the exact order of events per partition and would also required
>> quite some architectural changes for producers and the flink job.
>>
>> Now I had the idea to put the parsing task into ordered Async-IO. But
>> AsyncIO can only be plugged in into an existing Stream, not into the
>> deserialization schema, as far as I see. So the best idea I currently have
>> is to keep parsing in the DeserializationSchema as minimal as possible to
>> extract the Event timestamp and do the full parsing downstream in Async IO.
>> This however, seems to be a bit tedious, especially as we have to deal with
>> multiple input formats and would need to develop two parsers for the heavy
>> load once: a timestamp only and a full parser.
>>
>> Do you know if it is somehow possible to parallelize / async IO the
>> parsing within the KafkaDeserializationSchema? I don't have state access in
>> there and I don't have a "collector" object in there so that one element as
>> input needs to produce exactly one output element.
>>
>> Another question: My parsing produces Java POJO objects via "new", which
>> are sent downstream (reusePOJO setting set) and finally will be garbage
>> collected once they reached the sink. Is there some mechanism in Flink so
>> that I could reuse "old" sinked POJOs in the source? All tasks are chained
>> so that theoretically, that could be possible?
>>
>> Best regards
>> Theo
>>
>
Re: Parallelize Kafka Deserialization of a single partition?
Posted by Till Rohrmann <tr...@apache.org>.
Hi Theo,
the KafkaDeserializationSchema does not allow to return asynchronous
results. Hence, Flink will always wait until
KafkaDeserializationSchema.deserialize returns the parsed value.
Consequently, the only way I can think of to offload the complex parsing
logic would be to do it in a downstream operator where you could use
AsyncI/O to run the parsing logic in a thread pool, for example.
Alternatively, you could think about a simple program which transforms your
input events into another format where it is easier to extract the
timestamp from. This comes, however, at the cost of another Kafka topic.
Currently, enabling object reuse via ExecutionConfig.enableObjectReuse()
only affects the DataSet API. DataStream programs will always do defensive
copies. There is a FLIP to improve this behaviour [1].
[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982
Cheers,
Till
On Mon, Feb 17, 2020 at 1:14 PM Theo Diefenthal <
theo.diefenthal@scoop-software.de> wrote:
> Hi,
>
> As for most pipelines, our flink pipeline start with parsing source kafka
> events into POJOs. We perform this step within a KafkaDeserizationSchema so
> that we properly extract the event itme timestamp for the downstream
> Timestamp-Assigner.
>
> Now it turned out that parsing is currently the most CPU intensive task in
> our pipeline and thus CPU bounds the number of elements we can ingest per
> second. Further splitting up the partitions will be hard as we need to
> maintain the exact order of events per partition and would also required
> quite some architectural changes for producers and the flink job.
>
> Now I had the idea to put the parsing task into ordered Async-IO. But
> AsyncIO can only be plugged in into an existing Stream, not into the
> deserialization schema, as far as I see. So the best idea I currently have
> is to keep parsing in the DeserializationSchema as minimal as possible to
> extract the Event timestamp and do the full parsing downstream in Async IO.
> This however, seems to be a bit tedious, especially as we have to deal with
> multiple input formats and would need to develop two parsers for the heavy
> load once: a timestamp only and a full parser.
>
> Do you know if it is somehow possible to parallelize / async IO the
> parsing within the KafkaDeserializationSchema? I don't have state access in
> there and I don't have a "collector" object in there so that one element as
> input needs to produce exactly one output element.
>
> Another question: My parsing produces Java POJO objects via "new", which
> are sent downstream (reusePOJO setting set) and finally will be garbage
> collected once they reached the sink. Is there some mechanism in Flink so
> that I could reuse "old" sinked POJOs in the source? All tasks are chained
> so that theoretically, that could be possible?
>
> Best regards
> Theo
>