You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Chesnay Schepler <ch...@apache.org> on 2018/02/07 13:52:22 UTC

Re: How to handle multiple yarn sessions and choose at runtime the one to submit a ha streaming job ?

For future reference, the created JIRA: 
https://issues.apache.org/jira/browse/FLINK-8580

On 07.02.2018 10:48, LINZ, Arnaud wrote:
>
> Hi,
>
> Without any other solution, I made a shell script that copies the 
> original content of FLINK_CONF_DIR in a temporary rep, modify 
> flink-conf.yaml to set yarn.properties-file.location, and change 
> FLINK_CONF_DIR to that temp rep before executing flink.
>
> I am now able to select the container I want, but I think it should be 
> made simpler…
>
> I’ll open a Jira.
>
> Best regards,
>
> Arnaud
>
> *De :*LINZ, Arnaud
> *Envoyé :* jeudi 1 février 2018 16:23
> *À :* user@flink.apache.org
> *Objet :* How to handle multiple yarn sessions and choose at runtime 
> the one to submit a ha streaming job ?
>
> Hello,
>
> I am using Flink 1.3.2 and I’m struggling to achieve something that 
> should be simple.
>
> For isolation reasons, I want to start multiple long living yarn 
> session containers (with the same user) and choose at run-time, when I 
> start a HA streaming app, which container will hold it.
>
> I start my yarn session with the command line option : 
> -Dyarn.properties-file.location=mydir
>
> The session is created and a .yarn-properties-$USER file is generated.
>
> And I’ve tried the following to submit my job:
>
> *CASE 1 *
>
> *flink-conf.yaml*: yarn.properties-file.location: mydir
>
> *flink run options*: none
>
>   * Uses zookeeper and works  – but I cannot choose the container as
>     the property file is global.
>
> **
>
> *CASE 2*
>
> *flink-conf.yaml*: nothing
>
> *flink run options*: -yid applicationId
>
>   * Do not use zookeeper, tries to connect to yarn job manager but
>     fails in “Job submission to the JobManager timed out” error
>
> **
>
> *CASE 3*
>
> *flink-conf.yaml*: nothing
>
> *flink run options*: -yid applicationId and -yD with all dynamic 
> properties found in the “dynamicPropertiesString” of 
> .yarn-properties-$USER file
>
>   * Same as case 2
>
> **
>
> *CASE 4*
>
> *flink-conf.yaml*: nothing
>
> *flink run options*: -yD yarn.properties-file.location=mydir
>
>   * Tries to connect to local (non yarn) job manager (and fails)
>
> **
>
> *CASE 5*
>
> Even weirder:
>
> *flink-conf.yaml*: yarn.properties-file.location: mydir
>
> *flink run options*: -yD yarn.properties-file.location=mydir
>
>   * Still tries to connect to local (non yarn) job manager!
>
> What am I doing wrong?
>
> Logs extracts :
>
> *CASE 1:*
>
> 2018:02:01 15:43:20 - Waiting until all TaskManagers have connected
>
> 2018:02:01 15:43:20 - Starting client actor system.
>
> 2018:02:01 15:43:20 - Starting ZooKeeperLeaderRetrievalService.
>
> 2018:02:01 15:43:20 - Trying to select the network interface and 
> address to use by connecting to the leading JobManager.
>
> 2018:02:01 15:43:20 - TaskManager will try to connect for 10000 
> milliseconds before falling back to heuristics
>
> 2018:02:01 15:43:21 - Retrieved new target address 
> elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr/10.136.170.193:33970.
>
> 2018:02:01 15:43:21 - Stopping ZooKeeperLeaderRetrievalService.
>
> 2018:02:01 15:43:21 - Slf4jLogger started
>
> 2018:02:01 15:43:21 - Starting remoting
>
> 2018:02:01 15:43:21 - Remoting started; listening on addresses 
> :[akka.tcp://flink@elara-edge-u2-n01.dev.mlb.jupiter.nbyt.fr:36340]
>
> 2018:02:01 15:43:21 - Starting ZooKeeperLeaderRetrievalService.
>
> 2018:02:01 15:43:21 - Stopping ZooKeeperLeaderRetrievalService.
>
> 2018:02:01 15:43:21 - TaskManager status (2/1)
>
> 2018:02:01 15:43:21 - All TaskManagers are connected
>
> 2018:02:01 15:43:21 - Submitting job with JobID: 
> f69197b0b80a76319a87bde10c1e3f77. Waiting for job completion.
>
> 2018:02:01 15:43:21 - Starting ZooKeeperLeaderRetrievalService.
>
> 2018:02:01 15:43:21 - Received SubmitJobAndWait(JobGraph(jobId: 
> f69197b0b80a76319a87bde10c1e3f77)) but there is no connection to a 
> JobManager yet.
>
> 2018:02:01 15:43:21 - Received job SND-IMP-SIGNAST 
> (f69197b0b80a76319a87bde10c1e3f77).
>
> 2018:02:01 15:43:21 - Disconnect from JobManager null.
>
> 2018:02:01 15:43:21 - Connect to JobManager 
> Actor[akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245].
>
> 2018:02:01 15:43:21 - Connected to JobManager at 
> Actor[akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245] 
> with leader session id 388af5b8-5555-4923-8ee4-8a4b9bfbb0b9.
>
> 2018:02:01 15:43:21 - Sending message to JobManager 
> akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager 
> to submit job SND-IMP-SIGNAST (f69197b0b80a76319a87bde10c1e3f77) and 
> wait for progress
>
> 2018:02:01 15:43:21 - Upload jar files to job manager 
> akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.
>
> 2018:02:01 15:43:21 - Blob client connecting to 
> akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager
>
> 2018:02:01 15:43:22 - Submit job to the job manager 
> akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.
>
> 2018:02:01 15:43:22 - Job f69197b0b80a76319a87bde10c1e3f77 was 
> successfully submitted to the JobManager akka://flink/deadLetters.
>
> 2018:02:01 15:43:22 - 02/01/2018 15:43:22   Job execution switched to 
> status RUNNING.
>
> *CASE 2:*
>
> 2018:02:01 15:48:43 - Waiting until all TaskManagers have connected
>
> 2018:02:01 15:48:43 - Starting client actor system.
>
> 2018:02:01 15:48:43 - Trying to select the network interface and 
> address to use by connecting to the leading JobManager.
>
> 2018:02:01 15:48:43 - TaskManager will try to connect for 10000 
> milliseconds before falling back to heuristics
>
> 2018:02:01 15:48:43 - Retrieved new target address 
> elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr/10.136.170.193:33970.
>
> 2018:02:01 15:48:43 - Slf4jLogger started
>
> 2018:02:01 15:48:43 - Starting remoting
>
> 2018:02:01 15:48:43 - Remoting started; listening on addresses 
> :[akka.tcp://flink@elara-edge-u2-n01.dev.mlb.jupiter.nbyt.fr:34140]
>
> 2018:02:01 15:48:43 - TaskManager status (2/1)
>
> 2018:02:01 15:48:43 - All TaskManagers are connected
>
> 2018:02:01 15:48:43 - Submitting job with JobID: 
> cd3e0e223c57d01d415fe7a6a308576c. Waiting for job completion.
>
> 2018:02:01 15:48:43 - Received SubmitJobAndWait(JobGraph(jobId: 
> cd3e0e223c57d01d415fe7a6a308576c)) but there is no connection to a 
> JobManager yet.
>
> 2018:02:01 15:48:43 - Received job SND-IMP-SIGNAST 
> (cd3e0e223c57d01d415fe7a6a308576c).
>
> 2018:02:01 15:48:43 - Disconnect from JobManager null.
>
> 2018:02:01 15:48:43 - Connect to JobManager 
> Actor[akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245].
>
> 2018:02:01 15:48:43 - Connected to JobManager at 
> Actor[akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245] 
> with leader session id 00000000-0000-0000-0000-000000000000.
>
> 2018:02:01 15:48:43 - Sending message to JobManager 
> akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager 
> to submit job SND-IMP-SIGNAST (cd3e0e223c57d01d415fe7a6a308576c) and 
> wait for progress
>
> 2018:02:01 15:48:43 - Upload jar files to job manager 
> akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.
>
> 2018:02:01 15:48:43 - Blob client connecting to 
> akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager
>
> 2018:02:01 15:48:45 - Submit job to the job manager 
> akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.
>
> 2018:02:01 15:49:45 - Terminate JobClientActor.
>
> 2018:02:01 15:49:45 - Disconnect from JobManager 
> Actor[akka.tcp://flink@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245].
>
> Then
>
> Caused by: 
> org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: 
> Job submission to the JobManager timed out. You may increase 
> 'akka.client.timeout' in case the JobManager needs more time to 
> configure and confirm the job submission.
>
>         at 
> org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
>
>         at 
> org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251)
>
>         at 
> org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89)
>
>         at 
> org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
>
>         at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>
>         at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>
>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>
>         at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>
>         at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>
>         at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
>         at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
>         at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
>         at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> *CASE 3,4 *
>
> **
>
> 2018:02:01 15:35:14 - Starting client actor system.
>
> 2018:02:01 15:35:14 - Trying to select the network interface and 
> address to use by connecting to the leading JobManager.
>
> 2018:02:01 15:35:14 - TaskManager will try to connect for 10000 
> milliseconds before falling back to heuristics
>
> 2018:02:01 15:35:14 - Retrieved new target address 
> localhost/127.0.0.1:6123.
>
> 2018:02:01 15:35:15 - Trying to connect to address 
> localhost/127.0.0.1:6123
>
> 2018:02:01 15:35:15 - Failed to connect from address 
> 'elara-edge-u2-n01/10.136.170.196': Connexion refusée (Connection refused)
>
> 2018:02:01 15:35:15 - Failed to connect from address '/127.0.0.1': 
> Connexion refusée (Connection refused)
>
> 2018:02:01 15:35:15 - Failed to connect from address '/192.168.117.1': 
> Connexion refusée (Connection refused)
>
> 2018:02:01 15:35:15 - Failed to connect from address 
> '/10.136.170.225': Connexion refusée (Connection refused)
>
> 2018:02:01 15:35:15 - Failed to connect from address 
> '/fe80:0:0:0:20c:29ff:fe8f:3fdd%ens192': Le réseau n'est pas 
> accessible (connect failed)
>
> 2018:02:01 15:35:15 - Failed to connect from address 
> '/10.136.170.196': Connexion refusée (Connection refused)
>
> 2018:02:01 15:35:15 - Failed to connect from address '/127.0.0.1': 
> Connexion refusée (Connection refused)
>
> 2018:02:01 15:35:15 - Failed to connect from address '/192.168.117.1': 
> Connexion refusée (Connection refused)
>
> 2018:02:01 15:35:15 - Failed to connect from address 
> '/10.136.170.225': Connexion refusée (Connection refused)
>
> 2018:02:01 15:35:15 - Failed to connect from address 
> '/fe80:0:0:0:20c:29ff:fe8f:3fdd%ens192': Le réseau n'est pas 
> accessible (connect failed)
>
> 2018:02:01 15:35:15 - Failed to connect from address 
> '/10.136.170.196': Connexion refusée (Connection refused)
>
> 2018:02:01 15:35:15 - Failed to connect from address '/127.0.0.1': 
> Connexion refusée (Connection refused)
>
> **
>
>
> ------------------------------------------------------------------------
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société 
> expéditrice ne peut être tenue responsable de son contenu ni de ses 
> pièces jointes. Toute utilisation ou diffusion non autorisée est 
> interdite. Si vous n'êtes pas destinataire de ce message, merci de le 
> détruire et d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. 
> The company that sent this message cannot therefore be held liable for 
> its content nor attachments. Any unauthorized use or dissemination is 
> prohibited. If you are not the intended recipient of this message, 
> then please delete it and notify the sender.