You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Peter Zende <pe...@gmail.com> on 2019/07/13 13:04:17 UTC

Union of streams performance issue (10x)

Hi all

We have a pipeline (runs on YARN, Flink v1.7.1) which consumes a union of
Kafka and
HDFS sources. We remarked that the throughput is 10 times higher if only
one of these sources is consumed.  While trying to identify the problem I
implemented a no-op source which was unioned with one of the real sources:

  class NoOpSourceFunction extends ParallelSourceFunction[GenericRecord] {

    override def run(ctx: SourceContext[GenericRecord]): Unit = {}
    override def cancel(): Unit = {}
  }

  mainStream.union(env.addSource(new NoOpSourceFunction()))

I remarked that whenever I use a union with any sources like above or union
the stream with itself, I get the same performance hit.
When I compare the job graph on the Flink UI the only difference is that in
case of a union the two sources aren't chained to the subsequent downstream
operators (transformation steps), both are connected to them with
ship_strategy: FORWARD.
When only one source is present, that one is chained to the transformation
steps.

To avoid union (and/or forward partitioning) I tried to connect streams
with CoFlatMapFunction to get the same result but without any gain in
performance. I was thinking about to read the HDFS stream parallel and use
Iterate function to feed it back to a previous operator.

After a couple of trial and error I'd like ask for your advice. What is the
best practice here?  Which options / tools are there to analyze the
execution plan apart from the Flink plan visualizer and the provided web UI?

Thanks
Peter

Re: Union of streams performance issue (10x)

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Peter,

The performance drops probably be due to de/serialization.
When tasks are chained, records are simply forwarded as Java objects via
method calls.
When a task chain in broken into multiple operators, the records (Java
objects) are serialized by the sending task, possibly shipped over the
network, and deserialized by the receiving task.
Depending on the logic of the tasks, this can cause a performance drop.

Two tasks can only be chained, if both have the same parallelism and sender
tasks sends to a single task and the receiver receives from a single task.
The union receives from two tasks which cuts the chain.

Best, Fabian

Am Sa., 13. Juli 2019 um 15:04 Uhr schrieb Peter Zende <
peter.zende@gmail.com>:

> Hi all
>
> We have a pipeline (runs on YARN, Flink v1.7.1) which consumes a union of
> Kafka and
> HDFS sources. We remarked that the throughput is 10 times higher if only
> one of these sources is consumed.  While trying to identify the problem I
> implemented a no-op source which was unioned with one of the real sources:
>
>   class NoOpSourceFunction extends ParallelSourceFunction[GenericRecord] {
>
>     override def run(ctx: SourceContext[GenericRecord]): Unit = {}
>     override def cancel(): Unit = {}
>   }
>
>   mainStream.union(env.addSource(new NoOpSourceFunction()))
>
> I remarked that whenever I use a union with any sources like above or
> union the stream with itself, I get the same performance hit.
> When I compare the job graph on the Flink UI the only difference is that
> in case of a union the two sources aren't chained to the subsequent
> downstream operators (transformation steps), both are connected to them
> with  ship_strategy: FORWARD.
> When only one source is present, that one is chained to the transformation
> steps.
>
> To avoid union (and/or forward partitioning) I tried to connect streams
> with CoFlatMapFunction to get the same result but without any gain in
> performance. I was thinking about to read the HDFS stream parallel and use
> Iterate function to feed it back to a previous operator.
>
> After a couple of trial and error I'd like ask for your advice. What is
> the best practice here?  Which options / tools are there to analyze the
> execution plan apart from the Flink plan visualizer and the provided web UI?
>
> Thanks
> Peter
>
>