You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Averell <lv...@gmail.com> on 2020/10/19 06:22:31 UTC

HA on AWS EMR

Hi,

I'm trying to enable HA for my Flink jobs running on AWS EMR.
Following [1], I created a common Flink YARN session and submitting all my
jobs to that one. These 4 config params were added
/    high-availability = zookeeper
    high-availability.storageDir =  
    high-availability.zookepper.path.root = /flink
    high-availability.zookeeper.quorum = <EMR's master node's DNS name>:2181
/(The Zookeeper came with EMR was used)

The command to start that Flink YARN session is like this:
`/flink-yarn-session -Dtaskmanager.memory.process.size=4g -nm
FlinkCommonSession -z FlinkCommonSession -d/`

The first HA test - yarn application killed - went well. I killed that
common session by using `/yarn application --kill <appId>/` and created a
new session using the same command, then the jobs were restored
automatically after that session was up.

However, the 2nd HA test - EMR cluster crashed - didn't work: the */jobs are
not restored/ *after the common session was created on the new EMR cluster.
(attached  jobmanager.gz
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/jobmanager.gz> 
)

Should I expect that the jobs are restored in that scenario no.2 - EMR
cluster crashed.
Do I miss something here?

Thanks for your help.

Regards,
Averell

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: HA on AWS EMR

Posted by Averell <lv...@gmail.com>.
Hello Robert,

Thanks for the info. That makes sense. I will save and cancel my jobs with
1.10, upgrade to 1.11, and restore the jobs from the savepoints.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: HA on AWS EMR

Posted by Robert Metzger <rm...@apache.org>.
Hey Averell,

to clarify: You should be able to migrate using a savepoint from 1.10 to
1.11. Restoring from the state stored in Zookeeper (for HA) with a newer
Flink version won't work.

On Mon, Oct 26, 2020 at 5:05 PM Robert Metzger <rm...@apache.org> wrote:

> Hey Averell,
>
> you should be able to migrate savepoints from Flink 1.10 to 1.11.
>
> Is there a simple way for me to reproduce this issue locally? This seems
> to be a rare, but probably valid issue. Are you using any special
> operators? (like the new source API?)
>
> Best,
> Robert
>
> On Wed, Oct 21, 2020 at 11:07 AM Averell <lv...@gmail.com> wrote:
>
>> Hello Roman,
>>
>> Thanks for the answer.
>> I have already had that high-availability.storageDir configured to an S3
>> location. Our service is not critical enough, so to save the cost, we are
>> using the single-master EMR setup. I understand that we'll not get YARN HA
>> in that case, but what I expect here is the ability to quickly restore the
>> service in the case of failure. Without Zookeeper, when such failure
>> happens, I'll need to find the last checkpoint of each job and restore
>> from
>> there. With the help of HA feature, I can just start a new
>> flink-yarn-session, then all jobs will be restored.
>>
>> I tried to change zookeeper dataDir config to an EFS location which both
>> the
>> old and new EMR clusters could access, and that worked.
>>
>> However, now I have a new question: is it expectable to restore to a new
>> version of Flink (e.g. saved with Flink1.10 and restored to Flink1.11)? I
>> tried and got some error messages attached below. Not sure that's a bug or
>> expected behaviour.
>>
>> Thanks and best regards,
>> Averell
>>
>> ============
>> /07:39:33.906 [main-EventThread] ERROR
>> org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState -
>> Authentication failed
>> 07:40:11.585 [flink-akka.actor.default-dispatcher-2] ERROR
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error
>> occurred
>> in the cluster entrypoint.
>> org.apache.flink.runtime.dispatcher.DispatcherException: Could not start
>> recovered job 6e5c12f1c352dd4e6200c40aebb65745.
>>         at
>>
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$handleRecoveredJobStartError$0(Dispatcher.java:222)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>>
>> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
>> ~[?:1.8.0_265]
>>         at
>>
>> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
>> ~[?:1.8.0_265]
>>         at
>>
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>> ~[?:1.8.0_265]
>>         at
>>
>> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
>> ~[?:1.8.0_265]
>>         at
>>
>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:753)
>> ~[?:1.8.0_265]
>>         at
>>
>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>> ~[?:1.8.0_265]
>>         at
>>
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>>
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>>
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>>
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>>
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>>
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>> Caused by: java.util.concurrent.CompletionException:
>> org.apache.flink.runtime.client.JobExecutionException: Could not
>> instantiate
>> JobManager.
>>         at
>>
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>>
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>> ~[?:1.8.0_265]
>>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>>
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>         ... 4 more
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
>> not
>> instantiate JobManager.
>>         at
>>
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>>
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>> ~[?:1.8.0_265]
>>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>>
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>         ... 4 more
>> Caused by: java.lang.NullPointerException
>>         at
>> java.util.Collections$UnmodifiableCollection.<init>(Collections.java:1028)
>> ~[?:1.8.0_265]
>>         at
>> java.util.Collections$UnmodifiableList.<init>(Collections.java:1304)
>> ~[?:1.8.0_265]
>>         at java.util.Collections.unmodifiableList(Collections.java:1289)
>> ~[?:1.8.0_265]
>>         at
>>
>> org.apache.flink.runtime.jobgraph.JobVertex.getOperatorCoordinators(JobVertex.java:352)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>>
>> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:232)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>>
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:814)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>>
>> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:228)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>>
>> org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>>
>> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>>
>> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:229)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>>
>> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>>
>> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>>
>> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:272)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>>
>> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>>
>> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>>
>> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:140)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>>
>> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>>
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>>
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>> ~[?:1.8.0_265]
>>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>         at
>>
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> /
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>

Re: HA on AWS EMR

Posted by Robert Metzger <rm...@apache.org>.
Hey Averell,

you should be able to migrate savepoints from Flink 1.10 to 1.11.

Is there a simple way for me to reproduce this issue locally? This seems to
be a rare, but probably valid issue. Are you using any special operators?
(like the new source API?)

Best,
Robert

On Wed, Oct 21, 2020 at 11:07 AM Averell <lv...@gmail.com> wrote:

> Hello Roman,
>
> Thanks for the answer.
> I have already had that high-availability.storageDir configured to an S3
> location. Our service is not critical enough, so to save the cost, we are
> using the single-master EMR setup. I understand that we'll not get YARN HA
> in that case, but what I expect here is the ability to quickly restore the
> service in the case of failure. Without Zookeeper, when such failure
> happens, I'll need to find the last checkpoint of each job and restore from
> there. With the help of HA feature, I can just start a new
> flink-yarn-session, then all jobs will be restored.
>
> I tried to change zookeeper dataDir config to an EFS location which both
> the
> old and new EMR clusters could access, and that worked.
>
> However, now I have a new question: is it expectable to restore to a new
> version of Flink (e.g. saved with Flink1.10 and restored to Flink1.11)? I
> tried and got some error messages attached below. Not sure that's a bug or
> expected behaviour.
>
> Thanks and best regards,
> Averell
>
> ============
> /07:39:33.906 [main-EventThread] ERROR
> org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState -
> Authentication failed
> 07:40:11.585 [flink-akka.actor.default-dispatcher-2] ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error
> occurred
> in the cluster entrypoint.
> org.apache.flink.runtime.dispatcher.DispatcherException: Could not start
> recovered job 6e5c12f1c352dd4e6200c40aebb65745.
>         at
>
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$handleRecoveredJobStartError$0(Dispatcher.java:222)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
>
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
> ~[?:1.8.0_265]
>         at
>
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
> ~[?:1.8.0_265]
>         at
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> ~[?:1.8.0_265]
>         at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
> ~[?:1.8.0_265]
>         at
>
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:753)
> ~[?:1.8.0_265]
>         at
>
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> ~[?:1.8.0_265]
>         at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
>         at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
>         at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
>         at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
>
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
>
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.runtime.client.JobExecutionException: Could not
> instantiate
> JobManager.
>         at
>
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
>
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> ~[?:1.8.0_265]
>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>         ... 4 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
> instantiate JobManager.
>         at
>
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
>
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> ~[?:1.8.0_265]
>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>         ... 4 more
> Caused by: java.lang.NullPointerException
>         at
> java.util.Collections$UnmodifiableCollection.<init>(Collections.java:1028)
> ~[?:1.8.0_265]
>         at
> java.util.Collections$UnmodifiableList.<init>(Collections.java:1304)
> ~[?:1.8.0_265]
>         at java.util.Collections.unmodifiableList(Collections.java:1289)
> ~[?:1.8.0_265]
>         at
>
> org.apache.flink.runtime.jobgraph.JobVertex.getOperatorCoordinators(JobVertex.java:352)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:232)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:814)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:228)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:229)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:272)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:140)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
>
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
>
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> ~[?:1.8.0_265]
>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>         at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> /
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: HA on AWS EMR

Posted by Averell <lv...@gmail.com>.
Hello Roman,

Thanks for the answer.
I have already had that high-availability.storageDir configured to an S3
location. Our service is not critical enough, so to save the cost, we are
using the single-master EMR setup. I understand that we'll not get YARN HA
in that case, but what I expect here is the ability to quickly restore the
service in the case of failure. Without Zookeeper, when such failure
happens, I'll need to find the last checkpoint of each job and restore from
there. With the help of HA feature, I can just start a new
flink-yarn-session, then all jobs will be restored.

I tried to change zookeeper dataDir config to an EFS location which both the
old and new EMR clusters could access, and that worked.

However, now I have a new question: is it expectable to restore to a new
version of Flink (e.g. saved with Flink1.10 and restored to Flink1.11)? I
tried and got some error messages attached below. Not sure that's a bug or
expected behaviour.

Thanks and best regards,
Averell

============
/07:39:33.906 [main-EventThread] ERROR
org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState -
Authentication failed
07:40:11.585 [flink-akka.actor.default-dispatcher-2] ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error occurred
in the cluster entrypoint.
org.apache.flink.runtime.dispatcher.DispatcherException: Could not start
recovered job 6e5c12f1c352dd4e6200c40aebb65745.
        at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$handleRecoveredJobStartError$0(Dispatcher.java:222)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
~[?:1.8.0_265]
        at
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
~[?:1.8.0_265]
        at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_265]
        at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
~[?:1.8.0_265]
        at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:753)
~[?:1.8.0_265]
        at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
~[?:1.8.0_265]
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.11-1.11.0.jar:1.11.0]
Caused by: java.util.concurrent.CompletionException:
org.apache.flink.runtime.client.JobExecutionException: Could not instantiate
JobManager.
        at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
~[?:1.8.0_265]
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        ... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
instantiate JobManager.
        at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
~[?:1.8.0_265]
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        ... 4 more
Caused by: java.lang.NullPointerException
        at
java.util.Collections$UnmodifiableCollection.<init>(Collections.java:1028)
~[?:1.8.0_265]
        at
java.util.Collections$UnmodifiableList.<init>(Collections.java:1304)
~[?:1.8.0_265]
        at java.util.Collections.unmodifiableList(Collections.java:1289)
~[?:1.8.0_265]
        at
org.apache.flink.runtime.jobgraph.JobVertex.getOperatorCoordinators(JobVertex.java:352)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:232)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:814)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:228)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:229)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:272)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:140)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
~[?:1.8.0_265]
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
/



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: HA on AWS EMR

Posted by Khachatryan Roman <kh...@gmail.com>.
Hello Averell,

I don't think ZK data is stored on a master node. And Flink JM data is
stored usually on DFS -  according to "high-availability.storageDir" [1]

In either case, for Flink to be HA, Yarn should also be HA. And I think
this is not the case with a single master node. Please consider
multi-master EMR setup [2].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#high-availability-storagedir
[2] https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-ha.html

Regards,
Roman


On Tue, Oct 20, 2020 at 12:13 AM Averell <lv...@gmail.com> wrote:

> Hello Roman,
>
> Thanks for your time.
> I'm using EMR 5.30.1 (Flink 1.10.0) with 1 master node.
> /yarn.application-attempts/ is not set (does that means unlimited?), while
> /yarn.resourcemanager.am.max-attempts/ is 4.
>
> In saying "EMR cluster crashed) I meant the cluster is lost. Some scenarios
> which could lead to this are:
>   - The master node is down
>   - The cluster is accidentally / deliberately terminated.
>
> I found a thread in our mailing list [1], in which Fabian mentioned a
> /"pointer"/ stored in Zookeeper. It looks like this piece of information is
> stored in Zookeeper's dataDir, which is by default stored in the local
> storage of the EMR's master node. I'm trying to move this one to an EFS, in
> hope that it would help. Not sure whether this is a right approach.
>
> Thanks for your help.
> Regards,
> Averell
>
>
> [1]
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/HA-and-zookeeper-tp27093p27119.html
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: HA on AWS EMR

Posted by Averell <lv...@gmail.com>.
Hello Roman,

Thanks for your time.
I'm using EMR 5.30.1 (Flink 1.10.0) with 1 master node.
/yarn.application-attempts/ is not set (does that means unlimited?), while 
/yarn.resourcemanager.am.max-attempts/ is 4.

In saying "EMR cluster crashed) I meant the cluster is lost. Some scenarios
which could lead to this are:
  - The master node is down
  - The cluster is accidentally / deliberately terminated.

I found a thread in our mailing list [1], in which Fabian mentioned a
/"pointer"/ stored in Zookeeper. It looks like this piece of information is
stored in Zookeeper's dataDir, which is by default stored in the local
storage of the EMR's master node. I'm trying to move this one to an EFS, in
hope that it would help. Not sure whether this is a right approach.

Thanks for your help.
Regards,
Averell


[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/HA-and-zookeeper-tp27093p27119.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: HA on AWS EMR

Posted by Khachatryan Roman <kh...@gmail.com>.
Hi,

Can you explain what "EMR cluster crashed" means in the 2nd scenario?
Can you also share:
- yarn.application-attempts in Flink
- yarn.resourcemanager.am.max-attempts in Yarn
- number of EMR master nodes (1 or 3)
- EMR version?

Regards,
Roman


On Mon, Oct 19, 2020 at 8:22 AM Averell <lv...@gmail.com> wrote:

> Hi,
>
> I'm trying to enable HA for my Flink jobs running on AWS EMR.
> Following [1], I created a common Flink YARN session and submitting all my
> jobs to that one. These 4 config params were added
> /    high-availability = zookeeper
>     high-availability.storageDir =
>     high-availability.zookepper.path.root = /flink
>     high-availability.zookeeper.quorum = <EMR's master node's DNS
> name>:2181
> /(The Zookeeper came with EMR was used)
>
> The command to start that Flink YARN session is like this:
> `/flink-yarn-session -Dtaskmanager.memory.process.size=4g -nm
> FlinkCommonSession -z FlinkCommonSession -d/`
>
> The first HA test - yarn application killed - went well. I killed that
> common session by using `/yarn application --kill <appId>/` and created a
> new session using the same command, then the jobs were restored
> automatically after that session was up.
>
> However, the 2nd HA test - EMR cluster crashed - didn't work: the */jobs
> are
> not restored/ *after the common session was created on the new EMR cluster.
> (attached  jobmanager.gz
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/jobmanager.gz>
>
> )
>
> Should I expect that the jobs are restored in that scenario no.2 - EMR
> cluster crashed.
> Do I miss something here?
>
> Thanks for your help.
>
> Regards,
> Averell
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>