You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Antoine Philippot <an...@teads.tv> on 2017/10/13 08:16:08 UTC
Regression for dataStream.rescale method from 1.2.1 to 1.3.2
Hi,
After migrating our project from flink 1.2.1 to flink 1.3.2, we noticed a
big performance drop due to a bad vertices balancing between task manager.
In our use case, we set the default parallelism to the number of task
managers :
val stream: DataStream[Array[Byte]] = env.addSource(new
FlinkKafkaConsumer09[Array[Byte]]( ... )
.name("kafkaConsumer").rescale // 1 operator / instance
val parallelism = nbTaskManagers * nbTaskSlots
val hydratedStream: DataStream[Message] = stream
.flatMap(avroDeserializer).name("AvroDeserializer").setParallelism(parallelism)
.flatMap(messageParser).name("MessageParser").setParallelism(parallelism)
.flatMap(messageHydration).name("Hydration").setParallelism(parallelism)
.filter(MessageFilter).name("MessageFilter").setParallelism(parallelism)
hydratedStream.rescale // 1 operator / instance
.addSink(kafkaSink).name("KafkaSink")
If we take an example of 2 task managers with 4 slots by task manager
with flink 1.2.1 we had for each instances :
- 1 kafkaConsumer -> 4 mapOperators -> 1 kafkaSink
But with exactly the same code with flink 1.3.2 the sinks are all located
to one instance :
first instance :
- 1 kafkaConsumer -> 4 mapOperators -> 2 kafkaSink
second instance :
- 1 kafkaConsumer -> 4 mapOperators -> no kafkaSink (network transfert to
the first task manager)
This behaviour is the same with more task managers either in a local
cluster or in a yarn cluster
Is it a bug or should I update my code to have the same behaviour as flink
1.2.1 ?
Re: Regression for dataStream.rescale method from 1.2.1 to 1.3.2
Posted by Till Rohrmann <tr...@apache.org>.
Hi Antoine,
this looks like a regression to me. I'll investigate how this could happen
and let you know once I find something.
Cheers,
Till
On Fri, Oct 13, 2017 at 10:16 AM, Antoine Philippot <
antoine.philippot@teads.tv> wrote:
> Hi,
>
> After migrating our project from flink 1.2.1 to flink 1.3.2, we noticed a
> big performance drop due to a bad vertices balancing between task manager.
>
> In our use case, we set the default parallelism to the number of task
> managers :
> val stream: DataStream[Array[Byte]] = env.addSource(new
> FlinkKafkaConsumer09[Array[Byte]]( ... )
> .name("kafkaConsumer").rescale // 1 operator / instance
>
> val parallelism = nbTaskManagers * nbTaskSlots
> val hydratedStream: DataStream[Message] = stream
> .flatMap(avroDeserializer).name("AvroDeserializer").
> setParallelism(parallelism)
> .flatMap(messageParser).name("MessageParser").
> setParallelism(parallelism)
> .flatMap(messageHydration).name("Hydration").
> setParallelism(parallelism)
> .filter(MessageFilter).name("MessageFilter").
> setParallelism(parallelism)
>
> hydratedStream.rescale // 1 operator / instance
> .addSink(kafkaSink).name("KafkaSink")
>
> If we take an example of 2 task managers with 4 slots by task manager
> with flink 1.2.1 we had for each instances :
> - 1 kafkaConsumer -> 4 mapOperators -> 1 kafkaSink
>
> But with exactly the same code with flink 1.3.2 the sinks are all located
> to one instance :
> first instance :
> - 1 kafkaConsumer -> 4 mapOperators -> 2 kafkaSink
> second instance :
> - 1 kafkaConsumer -> 4 mapOperators -> no kafkaSink (network transfert to
> the first task manager)
>
> This behaviour is the same with more task managers either in a local
> cluster or in a yarn cluster
>
> Is it a bug or should I update my code to have the same behaviour as flink
> 1.2.1 ?
>