You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by KristoffSC <kr...@gmail.com> on 2020/02/20 22:38:56 UTC

How JobManager and TaskManager find each other?

Hi all,
I was wondering how JobManager and TaskManager find each other?
Do they use multicast for this?

Can it be configure to use domain names instead IP's?
What I have to do to have two Flink Clusters in same IP network?
How I should start task manager in order to tell him, to connect to cluster
B not A?

Regards,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: How JobManager and TaskManager find each other?

Posted by Yang Wang <da...@gmail.com>.
Hi KristoffSC,

Regarding your questions inline.

> 1. task deployment descriptor
The `TaskDeploymentDescriptor` is used by JobMaster to submit a task to
TaskManager.
Since the JobMaster knows all the operator and its location, it will put
the upstream operator location
in the `TaskDeploymentDescriptor`. So when the task is running, it always
know how to communicate
with others.

> 2. Kubernetes job cluster
When you deploy on Kubernetes, it is very different as NAT in PAAS. The
Kubernetes always has a
default overlay network. Each JobManager/TaskManager (i.e. Kubernetes Pod)
will be assigned with
a unique hostname and ip[1]. They could talk to each other directly. So you
do not need to set any
bind-host and bind-port.

> 3. Modify jobmanager.rpc.address
You need to create a Kubernetes service and set the
`jobmanager.rpc.address` to the service name.
This is used for the JobManager fault tolerance. When the JobManager failed
and relaunched again,
the TaskManager could still use the service name to re-register to
JobManager.
You do need to update conf/slaves and just follow the guide[2].


[1].
https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/
[2].
https://github.com/apache/flink/tree/release-1.10/flink-container/kubernetes


Best,
Yang

KristoffSC <kr...@gmail.com> 于2020年3月2日周一 下午3:25写道:

> Thanks about clarification for NAT,
>
> Moving NAT issue aside for a moment",
>
> Is the process of sending "task deployment descriptor" that you mentioned
> in
> "Feb 26, 2020; 4:18pm" a specially the process of notifying TaskManager
> about IP of participating TaskManagers in job described somewhere? I'm
> familiar with [1] [2] but in there there is no information about sending
> the
> IP information of Task managers.
>
>
> Another question is how this all sums for Kubernetes Job Session Cluster
> deployment when nodes will be deployed across many physical machines inside
> Kubernetes cluster.
> If I'm using Kubernetes like described in [3]
>
> The final question would be, do I have to modify jobmanager.rpc.address and
> flink/conf/slaves file when running Docker JobCluster on Kubernetes. The
> default values are localhost.
> Or just following [3] will be fine?
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/concepts/runtime.html
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html
> [3]
>
> https://github.com/apache/flink/tree/release-1.10/flink-container/kubernetes
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: How JobManager and TaskManager find each other?

Posted by KristoffSC <kr...@gmail.com>.
Thanks about clarification for NAT,

Moving NAT issue aside for a moment",

Is the process of sending "task deployment descriptor" that you mentioned in
"Feb 26, 2020; 4:18pm" a specially the process of notifying TaskManager
about IP of participating TaskManagers in job described somewhere? I'm
familiar with [1] [2] but in there there is no information about sending the
IP information of Task managers.


Another question is how this all sums for Kubernetes Job Session Cluster
deployment when nodes will be deployed across many physical machines inside
Kubernetes cluster.
If I'm using Kubernetes like described in [3]

The final question would be, do I have to modify jobmanager.rpc.address and
flink/conf/slaves file when running Docker JobCluster on Kubernetes. The
default values are localhost. 
Or just following [3] will be fine?

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/concepts/runtime.html
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html
[3]
https://github.com/apache/flink/tree/release-1.10/flink-container/kubernetes



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: How JobManager and TaskManager find each other?

Posted by Yang Wang <da...@gmail.com>.
Well i think the biggest challenge when running Flink on PaaS platforms
is about the NAT network. Since the jobmanager/taskmanagers are running
in a NAT network. So they could not talk directly. What we need to do is
make the JM/TM to bind the local machine hostname and use a bind-port
for the communication.

The community is still working on this.


Best,
Yang

KristoffSC <kr...@gmail.com> 于2020年2月27日周四 下午6:21写道:

> /So do you mean the ip address changes during running or the taskmanager
> failed and relaunched with a same hostname, but the ip address is
> different?/
>
> Well that also but actually I was thinking about running FLink on PaaS
> platforms where process can be re-spawned during runtime on different
> machine under different IP.
>
> Supposedly OpenShift is doing this.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: How JobManager and TaskManager find each other?

Posted by KristoffSC <kr...@gmail.com>.
/So do you mean the ip address changes during running or the taskmanager
failed and relaunched with a same hostname, but the ip address is
different?/

Well that also but actually I was thinking about running FLink on PaaS
platforms where process can be re-spawned during runtime on different
machine under different IP.

Supposedly OpenShift is doing this. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: How JobManager and TaskManager find each other?

Posted by Yang Wang <da...@gmail.com>.
So do you mean the ip address changes during running or the taskmanager
failed and relaunched with a same hostname, but the ip address is different?

I am not sure the TM registers to JM with hostname or ip address. We could
confirm that from @Zhijiang.

For the NAT, currently Flink could not work well with NAT. Since the
taskmanager
do not provide a bind host/port for others to connect. There's some tickets
to track
this problem[1].


[1]. https://issues.apache.org/jira/browse/FLINK-15911


Best,
Yang

KristoffSC <kr...@gmail.com> 于2020年2月26日周三 下午11:34写道:

> Thank you very much,
> what about if node Ip changes? Does it also supports dns or "raw" IP
> addresses only.
> I'm thinking about cloud deployments where actual service/process can be
> rescheduled to a different box but there is name resolving mechanism.
>
> Also what if there is NAT between Task Manager and Job Manager.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: How JobManager and TaskManager find each other?

Posted by KristoffSC <kr...@gmail.com>.
Thank you very much,
what about if node Ip changes? Does it also supports dns or "raw" IP
addresses only. 
I'm thinking about cloud deployments where actual service/process can be
rescheduled to a different box but there is name resolving mechanism.

Also what if there is NAT between Task Manager and Job Manager.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: How JobManager and TaskManager find each other?

Posted by Zhijiang <wa...@aliyun.com>.
I guess you are indicating the data shuffle process among different task managers.

While task manager(TM) registering itself to the job manager(JM), it also carries the infos of ip address and data port that it listens to.
During the process of scheduling tasks, the upstream TM's address info(ip, port) would be covered inside the data structure of task
 deployment descriptor for respective downstream tasks. Then the downstream tasks can connect to the remote upstream TM
to request data.

In short words, JM knows all the addresses of TMs via registration, then these addresses would be sent to the required peers during task schedule and deployment.

Best,
Zhijiang


------------------------------------------------------------------
From:KristoffSC <kr...@gmail.com>
Send Time:2020 Feb. 26 (Wed.) 19:39
To:user <us...@flink.apache.org>
Subject:Re: How JobManager and TaskManager find each other?

Thanks all for the answers,

One more question though. In [1] we can see that task managers are talking
with each other - sending data streams. How each task manager knows the
address of other task managers?


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/concepts/runtime.html#job-managers-task-managers-clients



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How JobManager and TaskManager find each other?

Posted by KristoffSC <kr...@gmail.com>.
Thanks all for the answers,

One more question though. In [1] we can see that task managers are talking
with each other - sending data streams. How each task manager knows the
address of other task managers?


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/concepts/runtime.html#job-managers-task-managers-clients



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: How JobManager and TaskManager find each other?

Posted by Yang Wang <da...@gmail.com>.
Hi Krzysztof,

Xintong has share some information about the configuration. I just want to
provide some additional
information.

>> How JobManager and TaskManager find each other?
For non-HA Flink cluster, the taskmanager should have the pre-configured
jobmanager address.
For HA Flink cluster, the leader retriever service is used for taskmanager
to get the jobmanager
address. Currently, Flink provide the zookeeper implementation. And it has
been widely used in
production.

>> Can it be configure to use domain names instead IP's?
It could be a domain name. But you should make sure the it could be
resolved by other jobmanager/
taskmanagers.

>> What I have to do to have two Flink Clusters in same IP network?
I am not sure what do you mean. Do you mean you have a NAT network and
multiple docker containers
will share a internal ip address?

>> How I should start task manager in order to tell him, to connect to
cluster B not A?
Just like i have said in the first section. For non-HA Flink cluster, you
need to specify the jobmanager rpc
address and port. For HA Flink cluster, you do not need to do anything.


Best,
Yang


Xintong Song <to...@gmail.com> 于2020年2月21日周五 上午11:41写道:

> Hi Krzysztof,
>
> You can use the following configuration options to specify JM/TM addresses
> and ports.
> - jobmanager.rpc.address
> - jobmanager.rpc.port
> - taskmanager.host
> - taskmanager.rpc.port
>
> The configuration accepts both IP addresses and hostnames.
>
> If you have two Flink Clusters, you can specify different JM address/ports
> in your TM configurations, so the TM knows which JM to connect to.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Feb 21, 2020 at 6:38 AM KristoffSC <kr...@gmail.com>
> wrote:
>
>> Hi all,
>> I was wondering how JobManager and TaskManager find each other?
>> Do they use multicast for this?
>>
>> Can it be configure to use domain names instead IP's?
>> What I have to do to have two Flink Clusters in same IP network?
>> How I should start task manager in order to tell him, to connect to
>> cluster
>> B not A?
>>
>> Regards,
>> Krzysztof
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>

Re: How JobManager and TaskManager find each other?

Posted by Xintong Song <to...@gmail.com>.
Hi Krzysztof,

You can use the following configuration options to specify JM/TM addresses
and ports.
- jobmanager.rpc.address
- jobmanager.rpc.port
- taskmanager.host
- taskmanager.rpc.port

The configuration accepts both IP addresses and hostnames.

If you have two Flink Clusters, you can specify different JM address/ports
in your TM configurations, so the TM knows which JM to connect to.

Thank you~

Xintong Song



On Fri, Feb 21, 2020 at 6:38 AM KristoffSC <kr...@gmail.com>
wrote:

> Hi all,
> I was wondering how JobManager and TaskManager find each other?
> Do they use multicast for this?
>
> Can it be configure to use domain names instead IP's?
> What I have to do to have two Flink Clusters in same IP network?
> How I should start task manager in order to tell him, to connect to cluster
> B not A?
>
> Regards,
> Krzysztof
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>