You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "LINZ, Arnaud" <AL...@bouyguestelecom.fr> on 2018/02/01 15:23:11 UTC

How to handle multiple yarn sessions and choose at runtime the one to submit a ha streaming job ?

Hello,

I am using Flink 1.3.2 and I'm struggling to achieve something that should be simple.
For isolation reasons, I want to start multiple long living yarn session containers (with the same user) and choose at run-time, when I start a HA streaming app, which container will hold it.

I start my yarn session with the command line option : -Dyarn.properties-file.location=mydir
The session is created and a .yarn-properties-$USER file is generated.

And I've tried the following to submit my job:

CASE 1
flink-conf.yaml : yarn.properties-file.location: mydir
flink run options : none

  *   Uses zookeeper and works  - but I cannot choose the container as the property file is global.

CASE 2
flink-conf.yaml : nothing
flink run options : -yid applicationId

  *   Do not use zookeeper, tries to connect to yarn job manager but fails in "Job submission to the JobManager timed out" error

CASE 3
flink-conf.yaml : nothing
flink run options : -yid applicationId and -yD with all dynamic properties found in the "dynamicPropertiesString" of .yarn-properties-$USER file

  *   Same as case 2

CASE 4
flink-conf.yaml : nothing
flink run options : -yD yarn.properties-file.location=mydir

  *   Tries to connect to local (non yarn) job manager (and fails)

CASE 5
Even weirder:
flink-conf.yaml : yarn.properties-file.location: mydir
flink run options : -yD yarn.properties-file.location=mydir

  *   Still tries to connect to local (non yarn) job manager!

What am I doing wrong?

Logs extracts :
CASE 1:
2018:02:01 15:43:20 - Waiting until all TaskManagers have connected
2018:02:01 15:43:20 - Starting client actor system.
2018:02:01 15:43:20 - Starting ZooKeeperLeaderRetrievalService.
2018:02:01 15:43:20 - Trying to select the network interface and address to use by connecting to the leading JobManager.
2018:02:01 15:43:20 - TaskManager will try to connect for 10000 milliseconds before falling back to heuristics
2018:02:01 15:43:21 - Retrieved new target address elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr/10.136.170.193:33970.
2018:02:01 15:43:21 - Stopping ZooKeeperLeaderRetrievalService.
2018:02:01 15:43:21 - Slf4jLogger started
2018:02:01 15:43:21 - Starting remoting
2018:02:01 15:43:21 - Remoting started; listening on addresses :[akka.tcp://flink@elara-edge-u2-n01.dev.mlb.jupiter.nbyt.fr:36340]
2018:02:01 15:43:21 - Starting ZooKeeperLeaderRetrievalService.
2018:02:01 15:43:21 - Stopping ZooKeeperLeaderRetrievalService.
2018:02:01 15:43:21 - TaskManager status (2/1)
2018:02:01 15:43:21 - All TaskManagers are connected
2018:02:01 15:43:21 - Submitting job with JobID: f69197b0b80a76319a87bde10c1e3f77. Waiting for job completion.
2018:02:01 15:43:21 - Starting ZooKeeperLeaderRetrievalService.
2018:02:01 15:43:21 - Received SubmitJobAndWait(JobGraph(jobId: f69197b0b80a76319a87bde10c1e3f77)) but there is no connection to a JobManager yet.
2018:02:01 15:43:21 - Received job SND-IMP-SIGNAST (f69197b0b80a76319a87bde10c1e3f77).
2018:02:01 15:43:21 - Disconnect from JobManager null.
2018:02:01 15:43:21 - Connect to JobManager Actor[akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245].
2018:02:01 15:43:21 - Connected to JobManager at Actor[akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245] with leader session id 388af5b8-5555-4923-8ee4-8a4b9bfbb0b9.
2018:02:01 15:43:21 - Sending message to JobManager akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager to submit job SND-IMP-SIGNAST (f69197b0b80a76319a87bde10c1e3f77) and wait for progress
2018:02:01 15:43:21 - Upload jar files to job manager akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.
2018:02:01 15:43:21 - Blob client connecting to akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager
2018:02:01 15:43:22 - Submit job to the job manager akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.
2018:02:01 15:43:22 - Job f69197b0b80a76319a87bde10c1e3f77 was successfully submitted to the JobManager akka://flink/deadLetters.
2018:02:01 15:43:22 - 02/01/2018 15:43:22   Job execution switched to status RUNNING.

CASE 2:
2018:02:01 15:48:43 - Waiting until all TaskManagers have connected
2018:02:01 15:48:43 - Starting client actor system.
2018:02:01 15:48:43 - Trying to select the network interface and address to use by connecting to the leading JobManager.
2018:02:01 15:48:43 - TaskManager will try to connect for 10000 milliseconds before falling back to heuristics
2018:02:01 15:48:43 - Retrieved new target address elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr/10.136.170.193:33970.
2018:02:01 15:48:43 - Slf4jLogger started
2018:02:01 15:48:43 - Starting remoting
2018:02:01 15:48:43 - Remoting started; listening on addresses :[akka.tcp://flink@elara-edge-u2-n01.dev.mlb.jupiter.nbyt.fr:34140]
2018:02:01 15:48:43 - TaskManager status (2/1)
2018:02:01 15:48:43 - All TaskManagers are connected
2018:02:01 15:48:43 - Submitting job with JobID: cd3e0e223c57d01d415fe7a6a308576c. Waiting for job completion.
2018:02:01 15:48:43 - Received SubmitJobAndWait(JobGraph(jobId: cd3e0e223c57d01d415fe7a6a308576c)) but there is no connection to a JobManager yet.
2018:02:01 15:48:43 - Received job SND-IMP-SIGNAST (cd3e0e223c57d01d415fe7a6a308576c).
2018:02:01 15:48:43 - Disconnect from JobManager null.
2018:02:01 15:48:43 - Connect to JobManager Actor[akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245].
2018:02:01 15:48:43 - Connected to JobManager at Actor[akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245] with leader session id 00000000-0000-0000-0000-000000000000.
2018:02:01 15:48:43 - Sending message to JobManager akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager to submit job SND-IMP-SIGNAST (cd3e0e223c57d01d415fe7a6a308576c) and wait for progress
2018:02:01 15:48:43 - Upload jar files to job manager akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.
2018:02:01 15:48:43 - Blob client connecting to akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager
2018:02:01 15:48:45 - Submit job to the job manager akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.
2018:02:01 15:49:45 - Terminate JobClientActor.
2018:02:01 15:49:45 - Disconnect from JobManager Actor[akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245].

Then
Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
        at org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
        at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251)
        at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89)
        at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
        at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

CASE 3,4

2018:02:01 15:35:14 - Starting client actor system.
2018:02:01 15:35:14 - Trying to select the network interface and address to use by connecting to the leading JobManager.
2018:02:01 15:35:14 - TaskManager will try to connect for 10000 milliseconds before falling back to heuristics
2018:02:01 15:35:14 - Retrieved new target address localhost/127.0.0.1:6123.
2018:02:01 15:35:15 - Trying to connect to address localhost/127.0.0.1:6123
2018:02:01 15:35:15 - Failed to connect from address 'elara-edge-u2-n01/10.136.170.196': Connexion refusée (Connection refused)
2018:02:01 15:35:15 - Failed to connect from address '/127.0.0.1': Connexion refusée (Connection refused)
2018:02:01 15:35:15 - Failed to connect from address '/192.168.117.1': Connexion refusée (Connection refused)
2018:02:01 15:35:15 - Failed to connect from address '/10.136.170.225': Connexion refusée (Connection refused)
2018:02:01 15:35:15 - Failed to connect from address '/fe80:0:0:0:20c:29ff:fe8f:3fdd%ens192': Le réseau n'est pas accessible (connect failed)
2018:02:01 15:35:15 - Failed to connect from address '/10.136.170.196': Connexion refusée (Connection refused)
2018:02:01 15:35:15 - Failed to connect from address '/127.0.0.1': Connexion refusée (Connection refused)
2018:02:01 15:35:15 - Failed to connect from address '/192.168.117.1': Connexion refusée (Connection refused)
2018:02:01 15:35:15 - Failed to connect from address '/10.136.170.225': Connexion refusée (Connection refused)
2018:02:01 15:35:15 - Failed to connect from address '/fe80:0:0:0:20c:29ff:fe8f:3fdd%ens192': Le réseau n'est pas accessible (connect failed)
2018:02:01 15:35:15 - Failed to connect from address '/10.136.170.196': Connexion refusée (Connection refused)
2018:02:01 15:35:15 - Failed to connect from address '/127.0.0.1': Connexion refusée (Connection refused)





________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

Re: How to handle multiple yarn sessions and choose at runtime the one to submit a ha streaming job ?

Posted by Chesnay Schepler <ch...@apache.org>.
For future reference, the created JIRA: 
https://issues.apache.org/jira/browse/FLINK-8580

On 07.02.2018 10:48, LINZ, Arnaud wrote:
>
> Hi,
>
> Without any other solution, I made a shell script that copies the 
> original content of FLINK_CONF_DIR in a temporary rep, modify 
> flink-conf.yaml to set yarn.properties-file.location, and change 
> FLINK_CONF_DIR to that temp rep before executing flink.
>
> I am now able to select the container I want, but I think it should be 
> made simpler…
>
> I’ll open a Jira.
>
> Best regards,
>
> Arnaud
>
> *De :*LINZ, Arnaud
> *Envoyé :* jeudi 1 février 2018 16:23
> *À :* user@flink.apache.org
> *Objet :* How to handle multiple yarn sessions and choose at runtime 
> the one to submit a ha streaming job ?
>
> Hello,
>
> I am using Flink 1.3.2 and I’m struggling to achieve something that 
> should be simple.
>
> For isolation reasons, I want to start multiple long living yarn 
> session containers (with the same user) and choose at run-time, when I 
> start a HA streaming app, which container will hold it.
>
> I start my yarn session with the command line option : 
> -Dyarn.properties-file.location=mydir
>
> The session is created and a .yarn-properties-$USER file is generated.
>
> And I’ve tried the following to submit my job:
>
> *CASE 1 *
>
> *flink-conf.yaml*: yarn.properties-file.location: mydir
>
> *flink run options*: none
>
>   * Uses zookeeper and works  – but I cannot choose the container as
>     the property file is global.
>
> **
>
> *CASE 2*
>
> *flink-conf.yaml*: nothing
>
> *flink run options*: -yid applicationId
>
>   * Do not use zookeeper, tries to connect to yarn job manager but
>     fails in “Job submission to the JobManager timed out” error
>
> **
>
> *CASE 3*
>
> *flink-conf.yaml*: nothing
>
> *flink run options*: -yid applicationId and -yD with all dynamic 
> properties found in the “dynamicPropertiesString” of 
> .yarn-properties-$USER file
>
>   * Same as case 2
>
> **
>
> *CASE 4*
>
> *flink-conf.yaml*: nothing
>
> *flink run options*: -yD yarn.properties-file.location=mydir
>
>   * Tries to connect to local (non yarn) job manager (and fails)
>
> **
>
> *CASE 5*
>
> Even weirder:
>
> *flink-conf.yaml*: yarn.properties-file.location: mydir
>
> *flink run options*: -yD yarn.properties-file.location=mydir
>
>   * Still tries to connect to local (non yarn) job manager!
>
> What am I doing wrong?
>
> Logs extracts :
>
> *CASE 1:*
>
> 2018:02:01 15:43:20 - Waiting until all TaskManagers have connected
>
> 2018:02:01 15:43:20 - Starting client actor system.
>
> 2018:02:01 15:43:20 - Starting ZooKeeperLeaderRetrievalService.
>
> 2018:02:01 15:43:20 - Trying to select the network interface and 
> address to use by connecting to the leading JobManager.
>
> 2018:02:01 15:43:20 - TaskManager will try to connect for 10000 
> milliseconds before falling back to heuristics
>
> 2018:02:01 15:43:21 - Retrieved new target address 
> elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr/10.136.170.193:33970.
>
> 2018:02:01 15:43:21 - Stopping ZooKeeperLeaderRetrievalService.
>
> 2018:02:01 15:43:21 - Slf4jLogger started
>
> 2018:02:01 15:43:21 - Starting remoting
>
> 2018:02:01 15:43:21 - Remoting started; listening on addresses 
> :[akka.tcp://flink@elara-edge-u2-n01.dev.mlb.jupiter.nbyt.fr:36340]
>
> 2018:02:01 15:43:21 - Starting ZooKeeperLeaderRetrievalService.
>
> 2018:02:01 15:43:21 - Stopping ZooKeeperLeaderRetrievalService.
>
> 2018:02:01 15:43:21 - TaskManager status (2/1)
>
> 2018:02:01 15:43:21 - All TaskManagers are connected
>
> 2018:02:01 15:43:21 - Submitting job with JobID: 
> f69197b0b80a76319a87bde10c1e3f77. Waiting for job completion.
>
> 2018:02:01 15:43:21 - Starting ZooKeeperLeaderRetrievalService.
>
> 2018:02:01 15:43:21 - Received SubmitJobAndWait(JobGraph(jobId: 
> f69197b0b80a76319a87bde10c1e3f77)) but there is no connection to a 
> JobManager yet.
>
> 2018:02:01 15:43:21 - Received job SND-IMP-SIGNAST 
> (f69197b0b80a76319a87bde10c1e3f77).
>
> 2018:02:01 15:43:21 - Disconnect from JobManager null.
>
> 2018:02:01 15:43:21 - Connect to JobManager 
> Actor[akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245].
>
> 2018:02:01 15:43:21 - Connected to JobManager at 
> Actor[akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245] 
> with leader session id 388af5b8-5555-4923-8ee4-8a4b9bfbb0b9.
>
> 2018:02:01 15:43:21 - Sending message to JobManager 
> akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager 
> to submit job SND-IMP-SIGNAST (f69197b0b80a76319a87bde10c1e3f77) and 
> wait for progress
>
> 2018:02:01 15:43:21 - Upload jar files to job manager 
> akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.
>
> 2018:02:01 15:43:21 - Blob client connecting to 
> akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager
>
> 2018:02:01 15:43:22 - Submit job to the job manager 
> akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.
>
> 2018:02:01 15:43:22 - Job f69197b0b80a76319a87bde10c1e3f77 was 
> successfully submitted to the JobManager akka://flink/deadLetters.
>
> 2018:02:01 15:43:22 - 02/01/2018 15:43:22   Job execution switched to 
> status RUNNING.
>
> *CASE 2:*
>
> 2018:02:01 15:48:43 - Waiting until all TaskManagers have connected
>
> 2018:02:01 15:48:43 - Starting client actor system.
>
> 2018:02:01 15:48:43 - Trying to select the network interface and 
> address to use by connecting to the leading JobManager.
>
> 2018:02:01 15:48:43 - TaskManager will try to connect for 10000 
> milliseconds before falling back to heuristics
>
> 2018:02:01 15:48:43 - Retrieved new target address 
> elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr/10.136.170.193:33970.
>
> 2018:02:01 15:48:43 - Slf4jLogger started
>
> 2018:02:01 15:48:43 - Starting remoting
>
> 2018:02:01 15:48:43 - Remoting started; listening on addresses 
> :[akka.tcp://flink@elara-edge-u2-n01.dev.mlb.jupiter.nbyt.fr:34140]
>
> 2018:02:01 15:48:43 - TaskManager status (2/1)
>
> 2018:02:01 15:48:43 - All TaskManagers are connected
>
> 2018:02:01 15:48:43 - Submitting job with JobID: 
> cd3e0e223c57d01d415fe7a6a308576c. Waiting for job completion.
>
> 2018:02:01 15:48:43 - Received SubmitJobAndWait(JobGraph(jobId: 
> cd3e0e223c57d01d415fe7a6a308576c)) but there is no connection to a 
> JobManager yet.
>
> 2018:02:01 15:48:43 - Received job SND-IMP-SIGNAST 
> (cd3e0e223c57d01d415fe7a6a308576c).
>
> 2018:02:01 15:48:43 - Disconnect from JobManager null.
>
> 2018:02:01 15:48:43 - Connect to JobManager 
> Actor[akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245].
>
> 2018:02:01 15:48:43 - Connected to JobManager at 
> Actor[akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245] 
> with leader session id 00000000-0000-0000-0000-000000000000.
>
> 2018:02:01 15:48:43 - Sending message to JobManager 
> akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager 
> to submit job SND-IMP-SIGNAST (cd3e0e223c57d01d415fe7a6a308576c) and 
> wait for progress
>
> 2018:02:01 15:48:43 - Upload jar files to job manager 
> akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.
>
> 2018:02:01 15:48:43 - Blob client connecting to 
> akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager
>
> 2018:02:01 15:48:45 - Submit job to the job manager 
> akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.
>
> 2018:02:01 15:49:45 - Terminate JobClientActor.
>
> 2018:02:01 15:49:45 - Disconnect from JobManager 
> Actor[akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245].
>
> Then
>
> Caused by: 
> org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: 
> Job submission to the JobManager timed out. You may increase 
> 'akka.client.timeout' in case the JobManager needs more time to 
> configure and confirm the job submission.
>
>         at 
> org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
>
>         at 
> org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251)
>
>         at 
> org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89)
>
>         at 
> org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
>
>         at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>
>         at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>
>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>
>         at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>
>         at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>
>         at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
>         at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
>         at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
>         at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> *CASE 3,4 *
>
> **
>
> 2018:02:01 15:35:14 - Starting client actor system.
>
> 2018:02:01 15:35:14 - Trying to select the network interface and 
> address to use by connecting to the leading JobManager.
>
> 2018:02:01 15:35:14 - TaskManager will try to connect for 10000 
> milliseconds before falling back to heuristics
>
> 2018:02:01 15:35:14 - Retrieved new target address 
> localhost/127.0.0.1:6123.
>
> 2018:02:01 15:35:15 - Trying to connect to address 
> localhost/127.0.0.1:6123
>
> 2018:02:01 15:35:15 - Failed to connect from address 
> 'elara-edge-u2-n01/10.136.170.196': Connexion refusée (Connection refused)
>
> 2018:02:01 15:35:15 - Failed to connect from address '/127.0.0.1': 
> Connexion refusée (Connection refused)
>
> 2018:02:01 15:35:15 - Failed to connect from address '/192.168.117.1': 
> Connexion refusée (Connection refused)
>
> 2018:02:01 15:35:15 - Failed to connect from address 
> '/10.136.170.225': Connexion refusée (Connection refused)
>
> 2018:02:01 15:35:15 - Failed to connect from address 
> '/fe80:0:0:0:20c:29ff:fe8f:3fdd%ens192': Le réseau n'est pas 
> accessible (connect failed)
>
> 2018:02:01 15:35:15 - Failed to connect from address 
> '/10.136.170.196': Connexion refusée (Connection refused)
>
> 2018:02:01 15:35:15 - Failed to connect from address '/127.0.0.1': 
> Connexion refusée (Connection refused)
>
> 2018:02:01 15:35:15 - Failed to connect from address '/192.168.117.1': 
> Connexion refusée (Connection refused)
>
> 2018:02:01 15:35:15 - Failed to connect from address 
> '/10.136.170.225': Connexion refusée (Connection refused)
>
> 2018:02:01 15:35:15 - Failed to connect from address 
> '/fe80:0:0:0:20c:29ff:fe8f:3fdd%ens192': Le réseau n'est pas 
> accessible (connect failed)
>
> 2018:02:01 15:35:15 - Failed to connect from address 
> '/10.136.170.196': Connexion refusée (Connection refused)
>
> 2018:02:01 15:35:15 - Failed to connect from address '/127.0.0.1': 
> Connexion refusée (Connection refused)
>
> **
>
>
> ------------------------------------------------------------------------
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société 
> expéditrice ne peut être tenue responsable de son contenu ni de ses 
> pièces jointes. Toute utilisation ou diffusion non autorisée est 
> interdite. Si vous n'êtes pas destinataire de ce message, merci de le 
> détruire et d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. 
> The company that sent this message cannot therefore be held liable for 
> its content nor attachments. Any unauthorized use or dissemination is 
> prohibited. If you are not the intended recipient of this message, 
> then please delete it and notify the sender.



RE: How to handle multiple yarn sessions and choose at runtime the one to submit a ha streaming job ?

Posted by "LINZ, Arnaud" <AL...@bouyguestelecom.fr>.
Hi,

Without any other solution, I made a shell script that copies the original content of FLINK_CONF_DIR in a temporary rep, modify flink-conf.yaml to set yarn.properties-file.location, and change FLINK_CONF_DIR to that temp rep before executing flink.
I am now able to select the container I want, but I think it should be made simpler...
I'll open a Jira.

Best regards,
Arnaud


De : LINZ, Arnaud
Envoyé : jeudi 1 février 2018 16:23
À : user@flink.apache.org
Objet : How to handle multiple yarn sessions and choose at runtime the one to submit a ha streaming job ?

Hello,

I am using Flink 1.3.2 and I'm struggling to achieve something that should be simple.
For isolation reasons, I want to start multiple long living yarn session containers (with the same user) and choose at run-time, when I start a HA streaming app, which container will hold it.

I start my yarn session with the command line option : -Dyarn.properties-file.location=mydir
The session is created and a .yarn-properties-$USER file is generated.

And I've tried the following to submit my job:

CASE 1
flink-conf.yaml : yarn.properties-file.location: mydir
flink run options : none

  *   Uses zookeeper and works  - but I cannot choose the container as the property file is global.

CASE 2
flink-conf.yaml : nothing
flink run options : -yid applicationId

  *   Do not use zookeeper, tries to connect to yarn job manager but fails in "Job submission to the JobManager timed out" error

CASE 3
flink-conf.yaml : nothing
flink run options : -yid applicationId and -yD with all dynamic properties found in the "dynamicPropertiesString" of .yarn-properties-$USER file

  *   Same as case 2

CASE 4
flink-conf.yaml : nothing
flink run options : -yD yarn.properties-file.location=mydir

  *   Tries to connect to local (non yarn) job manager (and fails)

CASE 5
Even weirder:
flink-conf.yaml : yarn.properties-file.location: mydir
flink run options : -yD yarn.properties-file.location=mydir

  *   Still tries to connect to local (non yarn) job manager!

What am I doing wrong?

Logs extracts :
CASE 1:
2018:02:01 15:43:20 - Waiting until all TaskManagers have connected
2018:02:01 15:43:20 - Starting client actor system.
2018:02:01 15:43:20 - Starting ZooKeeperLeaderRetrievalService.
2018:02:01 15:43:20 - Trying to select the network interface and address to use by connecting to the leading JobManager.
2018:02:01 15:43:20 - TaskManager will try to connect for 10000 milliseconds before falling back to heuristics
2018:02:01 15:43:21 - Retrieved new target address elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr/10.136.170.193:33970.
2018:02:01 15:43:21 - Stopping ZooKeeperLeaderRetrievalService.
2018:02:01 15:43:21 - Slf4jLogger started
2018:02:01 15:43:21 - Starting remoting
2018:02:01 15:43:21 - Remoting started; listening on addresses :[akka.tcp://flink@elara-edge-u2-n01.dev.mlb.jupiter.nbyt.fr:36340]
2018:02:01 15:43:21 - Starting ZooKeeperLeaderRetrievalService.
2018:02:01 15:43:21 - Stopping ZooKeeperLeaderRetrievalService.
2018:02:01 15:43:21 - TaskManager status (2/1)
2018:02:01 15:43:21 - All TaskManagers are connected
2018:02:01 15:43:21 - Submitting job with JobID: f69197b0b80a76319a87bde10c1e3f77. Waiting for job completion.
2018:02:01 15:43:21 - Starting ZooKeeperLeaderRetrievalService.
2018:02:01 15:43:21 - Received SubmitJobAndWait(JobGraph(jobId: f69197b0b80a76319a87bde10c1e3f77)) but there is no connection to a JobManager yet.
2018:02:01 15:43:21 - Received job SND-IMP-SIGNAST (f69197b0b80a76319a87bde10c1e3f77).
2018:02:01 15:43:21 - Disconnect from JobManager null.
2018:02:01 15:43:21 - Connect to JobManager Actor[akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245].
2018:02:01 15:43:21 - Connected to JobManager at Actor[akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245] with leader session id 388af5b8-5555-4923-8ee4-8a4b9bfbb0b9.
2018:02:01 15:43:21 - Sending message to JobManager akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager to submit job SND-IMP-SIGNAST (f69197b0b80a76319a87bde10c1e3f77) and wait for progress
2018:02:01 15:43:21 - Upload jar files to job manager akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.
2018:02:01 15:43:21 - Blob client connecting to akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager
2018:02:01 15:43:22 - Submit job to the job manager akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.
2018:02:01 15:43:22 - Job f69197b0b80a76319a87bde10c1e3f77 was successfully submitted to the JobManager akka://flink/deadLetters.
2018:02:01 15:43:22 - 02/01/2018 15:43:22   Job execution switched to status RUNNING.

CASE 2:
2018:02:01 15:48:43 - Waiting until all TaskManagers have connected
2018:02:01 15:48:43 - Starting client actor system.
2018:02:01 15:48:43 - Trying to select the network interface and address to use by connecting to the leading JobManager.
2018:02:01 15:48:43 - TaskManager will try to connect for 10000 milliseconds before falling back to heuristics
2018:02:01 15:48:43 - Retrieved new target address elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr/10.136.170.193:33970.
2018:02:01 15:48:43 - Slf4jLogger started
2018:02:01 15:48:43 - Starting remoting
2018:02:01 15:48:43 - Remoting started; listening on addresses :[akka.tcp://flink@elara-edge-u2-n01.dev.mlb.jupiter.nbyt.fr:34140]
2018:02:01 15:48:43 - TaskManager status (2/1)
2018:02:01 15:48:43 - All TaskManagers are connected
2018:02:01 15:48:43 - Submitting job with JobID: cd3e0e223c57d01d415fe7a6a308576c. Waiting for job completion.
2018:02:01 15:48:43 - Received SubmitJobAndWait(JobGraph(jobId: cd3e0e223c57d01d415fe7a6a308576c)) but there is no connection to a JobManager yet.
2018:02:01 15:48:43 - Received job SND-IMP-SIGNAST (cd3e0e223c57d01d415fe7a6a308576c).
2018:02:01 15:48:43 - Disconnect from JobManager null.
2018:02:01 15:48:43 - Connect to JobManager Actor[akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245].
2018:02:01 15:48:43 - Connected to JobManager at Actor[akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245] with leader session id 00000000-0000-0000-0000-000000000000.
2018:02:01 15:48:43 - Sending message to JobManager akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager to submit job SND-IMP-SIGNAST (cd3e0e223c57d01d415fe7a6a308576c) and wait for progress
2018:02:01 15:48:43 - Upload jar files to job manager akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.
2018:02:01 15:48:43 - Blob client connecting to akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager
2018:02:01 15:48:45 - Submit job to the job manager akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.
2018:02:01 15:49:45 - Terminate JobClientActor.
2018:02:01 15:49:45 - Disconnect from JobManager Actor[akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245].

Then
Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
        at org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
        at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251)
        at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89)
        at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
        at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

CASE 3,4

2018:02:01 15:35:14 - Starting client actor system.
2018:02:01 15:35:14 - Trying to select the network interface and address to use by connecting to the leading JobManager.
2018:02:01 15:35:14 - TaskManager will try to connect for 10000 milliseconds before falling back to heuristics
2018:02:01 15:35:14 - Retrieved new target address localhost/127.0.0.1:6123.
2018:02:01 15:35:15 - Trying to connect to address localhost/127.0.0.1:6123
2018:02:01 15:35:15 - Failed to connect from address 'elara-edge-u2-n01/10.136.170.196': Connexion refusée (Connection refused)
2018:02:01 15:35:15 - Failed to connect from address '/127.0.0.1': Connexion refusée (Connection refused)
2018:02:01 15:35:15 - Failed to connect from address '/192.168.117.1': Connexion refusée (Connection refused)
2018:02:01 15:35:15 - Failed to connect from address '/10.136.170.225': Connexion refusée (Connection refused)
2018:02:01 15:35:15 - Failed to connect from address '/fe80:0:0:0:20c:29ff:fe8f:3fdd%ens192': Le réseau n'est pas accessible (connect failed)
2018:02:01 15:35:15 - Failed to connect from address '/10.136.170.196': Connexion refusée (Connection refused)
2018:02:01 15:35:15 - Failed to connect from address '/127.0.0.1': Connexion refusée (Connection refused)
2018:02:01 15:35:15 - Failed to connect from address '/192.168.117.1': Connexion refusée (Connection refused)
2018:02:01 15:35:15 - Failed to connect from address '/10.136.170.225': Connexion refusée (Connection refused)
2018:02:01 15:35:15 - Failed to connect from address '/fe80:0:0:0:20c:29ff:fe8f:3fdd%ens192': Le réseau n'est pas accessible (connect failed)
2018:02:01 15:35:15 - Failed to connect from address '/10.136.170.196': Connexion refusée (Connection refused)
2018:02:01 15:35:15 - Failed to connect from address '/127.0.0.1': Connexion refusée (Connection refused)





________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.