You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Le Xu <sh...@gmail.com> on 2017/09/18 20:09:36 UTC

Flink SocketTextStream source scheduled to a single machine

Hello!

I'm trying to figure out how it happens: I'm having a program reading from
multiple socketTextStream and these text streams feed into different data
flow (and these data streams never connect in my job). It looks something
similar to below:

for(int i =0; i< hosts.length; i++) {

    DataStream<String> someStream = env.socketTextStream(hosts[i],
ports[i]);
    DataStream<Tuple2<String, String>> joinedAdImpressions =
rawMessageStream.rebalance() ...

However, when I run the job on a cluster I found that all source task have
been scheduled to one machine so the machine becomes a severe bottleneck
for the performance. Any ideas how would this happen?

Thanks!

Re: Flink SocketTextStream source scheduled to a single machine

Posted by Till Rohrmann <tr...@apache.org>.
Hi Le Xu,

the reason why all different SocketTextStreamFunction sources are scheduled
to the same machine is because of slot sharing. Slot sharing allows Flink
to schedule tasks belonging to different operators into the same slot. This
allows, for example, to achieve better colocation between tasks which
depend on each other (e.g. build-side, probe-side and actual join operator
running in the same slot). Moreover, it makes it easier to reason about how
many slots your application needs, which is the maximum parallelism of your
job.

However, the downside is that independent components of your job won't be
spread across the cluster but usually end up in the same slot(s)
(consequently on the same machine, too) due to slot sharing.

You can disable slot sharing for parts of your job if you set explicitly a
different slot sharing group name. Then only operators which are assigned
to the same slot sharing group are subject to slot sharing. Down stream
operators inherit the slot sharing group from their inputs. Thus, if you
have an embarrassingly parallel job, then it suffices to only the set the
slot sharing group at the sources.

for(int i =0; i< hosts.length; i++) {
    DataStream<String> someStream = env
       .socketTextStream(hosts[i], ports[i])
       .slotSharingGroup("socket_" + i);

    DataStream<Tuple2<String, String>> joinedAdImpressions =
rawMessageStream.rebalance() ...
}

Cheers,
Till

On Mon, Sep 18, 2017 at 10:09 PM, Le Xu <sh...@gmail.com> wrote:

> Hello!
>
> I'm trying to figure out how it happens: I'm having a program reading from
> multiple socketTextStream and these text streams feed into different data
> flow (and these data streams never connect in my job). It looks something
> similar to below:
>
> for(int i =0; i< hosts.length; i++) {
>
>     DataStream<String> someStream = env.socketTextStream(hosts[i],
> ports[i]);
>     DataStream<Tuple2<String, String>> joinedAdImpressions =
> rawMessageStream.rebalance() ...
>
> However, when I run the job on a cluster I found that all source task have
> been scheduled to one machine so the machine becomes a severe bottleneck
> for the performance. Any ideas how would this happen?
>
> Thanks!
>