You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Stephen Connolly <st...@gmail.com> on 2020/02/07 11:40:42 UTC

Rescaling a running topology

So I am looking at the Flink Management REST API... and, as I see it, there
are two paths to rescale a running topology:

1. Stop the topology with a savepoint and then start it up with the new
savepoint; or
2. Use the /jobs/:jobid/rescaling
<https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/rest_api.html#jobs-jobid-rescaling>
endpoint

The first one seems to work just fine.

The second one seems to just blow up every time I try to use it... I'll get
things like:

https://gist.github.com/stephenc/0bbc08391ddce5a781242900e4b33a5d#file-log-txt

The above was for the topology
https://gist.github.com/stephenc/0bbc08391ddce5a781242900e4b33a5d#file-streamingjob-java
running with options:

    --source parallel

Things are even worse with --source iterator as that has no checkpoint
state to recover from

Right now I am trying to discover what preconditions are required to be met
in order to be able to safely call the Rescaling endpoint and actually have
it work... I should note that I currently have not managed to get it to
work at all!!!

One of the things we are trying to do is add some automation to enable
scale-up / down as we see surges in processing load. We want to have an
automated system that can respond to those situations automatically for low
deltas and trigger an on-call engineer for persistent excess load. In that
regard I'd like to know what the automation should check to know whether it
can do rescaling via the dedicated end-point or if it should use the
reliable (but presumably slower) path of stop with savepoint & start from
savepoint.

The
https://gist.github.com/stephenc/0bbc08391ddce5a781242900e4b33a5d#file-streamingjob-java
job I have been using is just a quick job to let me test the automation on
a local cluster. It is designed to output a strictly increasing sequence of
numbers without missing any... optionally double them and then print them
out. The different sources are me experimenting with different types of
operator to see what kinds of topology can work with the rescaling end-point

Thanks in advance

Re: Rescaling a running topology

Posted by Andrey Zagrebin <az...@apache.org>.
Hi Stephen,

I am sorry that you had this experience with the rescale API.
Unfortunately, the rescale API was always experimental and had some flaws.
Recently, Flink community decided to disable it temporarily with the 1.9
release, see more explanation here [1].

I would advise the manual rescaling (path 1 in your original message).
Technically, the rescale operation skips some steps like job master startup
and uploading job artefacts but then it still does a similar thing as the
manual workflow:
1. take savepoint
2. redeploy tasks with the new parallelism from that savepoint
So practically, there should not be a big difference but it depends on the
job, of course, whether the rescale operation is faster or not.

Thanks,
Andrey

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-Temporarily-remove-support-for-job-rescaling-via-CLI-action-quot-modify-quot-td27447.html

On Fri, Feb 7, 2020 at 1:31 PM Stephen Connolly <
stephen.alan.connolly@gmail.com> wrote:

> Ooooh more fun... If I rescale down a job, the job's config at
> jobs/{jobid}/config does not reflect the new parallelism (there may not
> even be any way to detect such a parallelism change)... but more critically
> the job is now unstoppable and seems to end up stuck in the CANCELLING
> state for some time (I gave up waiting)
>
> On Fri, 7 Feb 2020 at 11:54, Stephen Connolly <
> stephen.alan.connolly@gmail.com> wrote:
>
>> And now the job is stuck in a suspended state and I seem to have no way
>> to get it out of that state again!
>>
>> On Fri, 7 Feb 2020 at 11:50, Stephen Connolly <
>> stephen.alan.connolly@gmail.com> wrote:
>>
>>> The plot thickens... I was able to rescale down... just not back up
>>> again!!!
>>>
>>> root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink list -m
>>> localhost:8081
>>> Waiting for response...
>>> ------------------ Running/Restarting Jobs -------------------
>>> 07.02.2020 11:26:33 : ebc20a700c334f61ea03ecdf3d8939ca : Test topology
>>> (RUNNING)
>>> --------------------------------------------------------------
>>> No scheduled jobs.
>>> root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink modify -m
>>> localhost:8081 ebc20a700c334f61ea03ecdf3d8939ca -p 1
>>> Modify job ebc20a700c334f61ea03ecdf3d8939ca.
>>> Rescaled job ebc20a700c334f61ea03ecdf3d8939ca. Its new parallelism is 1.
>>> root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink modify -m
>>> localhost:8081 ebc20a700c334f61ea03ecdf3d8939ca -p 2
>>> Modify job ebc20a700c334f61ea03ecdf3d8939ca.
>>>
>>> ------------------------------------------------------------
>>>  The program finished with the following exception:
>>>
>>> org.apache.flink.util.FlinkException: Could not rescale job
>>> ebc20a700c334f61ea03ecdf3d8939ca.
>>> at
>>> org.apache.flink.client.cli.CliFrontend.lambda$modify$10(CliFrontend.java:799)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
>>> at org.apache.flink.client.cli.CliFrontend.modify(CliFrontend.java:790)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1068)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>>> at
>>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>>> Caused by: java.util.concurrent.CompletionException:
>>> java.lang.IllegalStateException: Suspend needs to happen atomically
>>> at
>>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>>> at
>>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>>> at
>>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:975)
>>> at
>>> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
>>> at
>>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
>>> at
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
>>> at
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>> at
>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>> at akka.actor.Actor.aroundReceive(Actor.scala:502)
>>> at akka.actor.Actor.aroundReceive$(Actor.scala:500)
>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>>> at
>>> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>>> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>>> at
>>> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
>>> Caused by: java.lang.IllegalStateException: Suspend needs to happen
>>> atomically
>>> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>>> at
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.suspend(ExecutionGraph.java:1172)
>>> at
>>> org.apache.flink.runtime.jobmaster.JobMaster.suspendExecutionGraph(JobMaster.java:1221)
>>> at
>>> org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$5(JobMaster.java:465)
>>> at
>>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
>>> ... 20 more
>>>
>>> On Fri, 7 Feb 2020 at 11:40, Stephen Connolly <
>>> stephen.alan.connolly@gmail.com> wrote:
>>>
>>>> So I am looking at the Flink Management REST API... and, as I see it,
>>>> there are two paths to rescale a running topology:
>>>>
>>>> 1. Stop the topology with a savepoint and then start it up with the new
>>>> savepoint; or
>>>> 2. Use the /jobs/:jobid/rescaling
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/rest_api.html#jobs-jobid-rescaling>
>>>> endpoint
>>>>
>>>> The first one seems to work just fine.
>>>>
>>>> The second one seems to just blow up every time I try to use it... I'll
>>>> get things like:
>>>>
>>>>
>>>> https://gist.github.com/stephenc/0bbc08391ddce5a781242900e4b33a5d#file-log-txt
>>>>
>>>> The above was for the topology
>>>> https://gist.github.com/stephenc/0bbc08391ddce5a781242900e4b33a5d#file-streamingjob-java
>>>> running with options:
>>>>
>>>>     --source parallel
>>>>
>>>> Things are even worse with --source iterator as that has no checkpoint
>>>> state to recover from
>>>>
>>>> Right now I am trying to discover what preconditions are required to be
>>>> met in order to be able to safely call the Rescaling endpoint and actually
>>>> have it work... I should note that I currently have not managed to get it
>>>> to work at all!!!
>>>>
>>>> One of the things we are trying to do is add some automation to enable
>>>> scale-up / down as we see surges in processing load. We want to have an
>>>> automated system that can respond to those situations automatically for low
>>>> deltas and trigger an on-call engineer for persistent excess load. In that
>>>> regard I'd like to know what the automation should check to know whether it
>>>> can do rescaling via the dedicated end-point or if it should use the
>>>> reliable (but presumably slower) path of stop with savepoint & start from
>>>> savepoint.
>>>>
>>>> The
>>>> https://gist.github.com/stephenc/0bbc08391ddce5a781242900e4b33a5d#file-streamingjob-java
>>>> job I have been using is just a quick job to let me test the automation on
>>>> a local cluster. It is designed to output a strictly increasing sequence of
>>>> numbers without missing any... optionally double them and then print them
>>>> out. The different sources are me experimenting with different types of
>>>> operator to see what kinds of topology can work with the rescaling end-point
>>>>
>>>> Thanks in advance
>>>>
>>>

Re: Rescaling a running topology

Posted by Stephen Connolly <st...@gmail.com>.
Ooooh more fun... If I rescale down a job, the job's config at
jobs/{jobid}/config does not reflect the new parallelism (there may not
even be any way to detect such a parallelism change)... but more critically
the job is now unstoppable and seems to end up stuck in the CANCELLING
state for some time (I gave up waiting)

On Fri, 7 Feb 2020 at 11:54, Stephen Connolly <
stephen.alan.connolly@gmail.com> wrote:

> And now the job is stuck in a suspended state and I seem to have no way to
> get it out of that state again!
>
> On Fri, 7 Feb 2020 at 11:50, Stephen Connolly <
> stephen.alan.connolly@gmail.com> wrote:
>
>> The plot thickens... I was able to rescale down... just not back up
>> again!!!
>>
>> root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink list -m
>> localhost:8081
>> Waiting for response...
>> ------------------ Running/Restarting Jobs -------------------
>> 07.02.2020 11:26:33 : ebc20a700c334f61ea03ecdf3d8939ca : Test topology
>> (RUNNING)
>> --------------------------------------------------------------
>> No scheduled jobs.
>> root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink modify -m
>> localhost:8081 ebc20a700c334f61ea03ecdf3d8939ca -p 1
>> Modify job ebc20a700c334f61ea03ecdf3d8939ca.
>> Rescaled job ebc20a700c334f61ea03ecdf3d8939ca. Its new parallelism is 1.
>> root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink modify -m
>> localhost:8081 ebc20a700c334f61ea03ecdf3d8939ca -p 2
>> Modify job ebc20a700c334f61ea03ecdf3d8939ca.
>>
>> ------------------------------------------------------------
>>  The program finished with the following exception:
>>
>> org.apache.flink.util.FlinkException: Could not rescale job
>> ebc20a700c334f61ea03ecdf3d8939ca.
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$modify$10(CliFrontend.java:799)
>> at
>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
>> at org.apache.flink.client.cli.CliFrontend.modify(CliFrontend.java:790)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1068)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>> at
>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>> Caused by: java.util.concurrent.CompletionException:
>> java.lang.IllegalStateException: Suspend needs to happen atomically
>> at
>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>> at
>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>> at
>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:975)
>> at
>> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
>> at
>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>> at
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>> at akka.actor.Actor.aroundReceive(Actor.scala:502)
>> at akka.actor.Actor.aroundReceive$(Actor.scala:500)
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>> at
>> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>> at
>> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
>> Caused by: java.lang.IllegalStateException: Suspend needs to happen
>> atomically
>> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.suspend(ExecutionGraph.java:1172)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.suspendExecutionGraph(JobMaster.java:1221)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$5(JobMaster.java:465)
>> at
>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
>> ... 20 more
>>
>> On Fri, 7 Feb 2020 at 11:40, Stephen Connolly <
>> stephen.alan.connolly@gmail.com> wrote:
>>
>>> So I am looking at the Flink Management REST API... and, as I see it,
>>> there are two paths to rescale a running topology:
>>>
>>> 1. Stop the topology with a savepoint and then start it up with the new
>>> savepoint; or
>>> 2. Use the /jobs/:jobid/rescaling
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/rest_api.html#jobs-jobid-rescaling>
>>> endpoint
>>>
>>> The first one seems to work just fine.
>>>
>>> The second one seems to just blow up every time I try to use it... I'll
>>> get things like:
>>>
>>>
>>> https://gist.github.com/stephenc/0bbc08391ddce5a781242900e4b33a5d#file-log-txt
>>>
>>> The above was for the topology
>>> https://gist.github.com/stephenc/0bbc08391ddce5a781242900e4b33a5d#file-streamingjob-java
>>> running with options:
>>>
>>>     --source parallel
>>>
>>> Things are even worse with --source iterator as that has no checkpoint
>>> state to recover from
>>>
>>> Right now I am trying to discover what preconditions are required to be
>>> met in order to be able to safely call the Rescaling endpoint and actually
>>> have it work... I should note that I currently have not managed to get it
>>> to work at all!!!
>>>
>>> One of the things we are trying to do is add some automation to enable
>>> scale-up / down as we see surges in processing load. We want to have an
>>> automated system that can respond to those situations automatically for low
>>> deltas and trigger an on-call engineer for persistent excess load. In that
>>> regard I'd like to know what the automation should check to know whether it
>>> can do rescaling via the dedicated end-point or if it should use the
>>> reliable (but presumably slower) path of stop with savepoint & start from
>>> savepoint.
>>>
>>> The
>>> https://gist.github.com/stephenc/0bbc08391ddce5a781242900e4b33a5d#file-streamingjob-java
>>> job I have been using is just a quick job to let me test the automation on
>>> a local cluster. It is designed to output a strictly increasing sequence of
>>> numbers without missing any... optionally double them and then print them
>>> out. The different sources are me experimenting with different types of
>>> operator to see what kinds of topology can work with the rescaling end-point
>>>
>>> Thanks in advance
>>>
>>

Re: Rescaling a running topology

Posted by Stephen Connolly <st...@gmail.com>.
And now the job is stuck in a suspended state and I seem to have no way to
get it out of that state again!

On Fri, 7 Feb 2020 at 11:50, Stephen Connolly <
stephen.alan.connolly@gmail.com> wrote:

> The plot thickens... I was able to rescale down... just not back up
> again!!!
>
> root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink list -m
> localhost:8081
> Waiting for response...
> ------------------ Running/Restarting Jobs -------------------
> 07.02.2020 11:26:33 : ebc20a700c334f61ea03ecdf3d8939ca : Test topology
> (RUNNING)
> --------------------------------------------------------------
> No scheduled jobs.
> root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink modify -m
> localhost:8081 ebc20a700c334f61ea03ecdf3d8939ca -p 1
> Modify job ebc20a700c334f61ea03ecdf3d8939ca.
> Rescaled job ebc20a700c334f61ea03ecdf3d8939ca. Its new parallelism is 1.
> root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink modify -m
> localhost:8081 ebc20a700c334f61ea03ecdf3d8939ca -p 2
> Modify job ebc20a700c334f61ea03ecdf3d8939ca.
>
> ------------------------------------------------------------
>  The program finished with the following exception:
>
> org.apache.flink.util.FlinkException: Could not rescale job
> ebc20a700c334f61ea03ecdf3d8939ca.
> at
> org.apache.flink.client.cli.CliFrontend.lambda$modify$10(CliFrontend.java:799)
> at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
> at org.apache.flink.client.cli.CliFrontend.modify(CliFrontend.java:790)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1068)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: java.util.concurrent.CompletionException:
> java.lang.IllegalStateException: Suspend needs to happen atomically
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
> at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:975)
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor.aroundReceive(Actor.scala:502)
> at akka.actor.Actor.aroundReceive$(Actor.scala:500)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
> Caused by: java.lang.IllegalStateException: Suspend needs to happen
> atomically
> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.suspend(ExecutionGraph.java:1172)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.suspendExecutionGraph(JobMaster.java:1221)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$5(JobMaster.java:465)
> at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
> ... 20 more
>
> On Fri, 7 Feb 2020 at 11:40, Stephen Connolly <
> stephen.alan.connolly@gmail.com> wrote:
>
>> So I am looking at the Flink Management REST API... and, as I see it,
>> there are two paths to rescale a running topology:
>>
>> 1. Stop the topology with a savepoint and then start it up with the new
>> savepoint; or
>> 2. Use the /jobs/:jobid/rescaling
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/rest_api.html#jobs-jobid-rescaling>
>> endpoint
>>
>> The first one seems to work just fine.
>>
>> The second one seems to just blow up every time I try to use it... I'll
>> get things like:
>>
>>
>> https://gist.github.com/stephenc/0bbc08391ddce5a781242900e4b33a5d#file-log-txt
>>
>> The above was for the topology
>> https://gist.github.com/stephenc/0bbc08391ddce5a781242900e4b33a5d#file-streamingjob-java
>> running with options:
>>
>>     --source parallel
>>
>> Things are even worse with --source iterator as that has no checkpoint
>> state to recover from
>>
>> Right now I am trying to discover what preconditions are required to be
>> met in order to be able to safely call the Rescaling endpoint and actually
>> have it work... I should note that I currently have not managed to get it
>> to work at all!!!
>>
>> One of the things we are trying to do is add some automation to enable
>> scale-up / down as we see surges in processing load. We want to have an
>> automated system that can respond to those situations automatically for low
>> deltas and trigger an on-call engineer for persistent excess load. In that
>> regard I'd like to know what the automation should check to know whether it
>> can do rescaling via the dedicated end-point or if it should use the
>> reliable (but presumably slower) path of stop with savepoint & start from
>> savepoint.
>>
>> The
>> https://gist.github.com/stephenc/0bbc08391ddce5a781242900e4b33a5d#file-streamingjob-java
>> job I have been using is just a quick job to let me test the automation on
>> a local cluster. It is designed to output a strictly increasing sequence of
>> numbers without missing any... optionally double them and then print them
>> out. The different sources are me experimenting with different types of
>> operator to see what kinds of topology can work with the rescaling end-point
>>
>> Thanks in advance
>>
>

Re: Rescaling a running topology

Posted by Stephen Connolly <st...@gmail.com>.
The plot thickens... I was able to rescale down... just not back up again!!!

root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink list -m
localhost:8081
Waiting for response...
------------------ Running/Restarting Jobs -------------------
07.02.2020 11:26:33 : ebc20a700c334f61ea03ecdf3d8939ca : Test topology
(RUNNING)
--------------------------------------------------------------
No scheduled jobs.
root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink modify -m
localhost:8081 ebc20a700c334f61ea03ecdf3d8939ca -p 1
Modify job ebc20a700c334f61ea03ecdf3d8939ca.
Rescaled job ebc20a700c334f61ea03ecdf3d8939ca. Its new parallelism is 1.
root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink modify -m
localhost:8081 ebc20a700c334f61ea03ecdf3d8939ca -p 2
Modify job ebc20a700c334f61ea03ecdf3d8939ca.

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.util.FlinkException: Could not rescale job
ebc20a700c334f61ea03ecdf3d8939ca.
at
org.apache.flink.client.cli.CliFrontend.lambda$modify$10(CliFrontend.java:799)
at
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
at org.apache.flink.client.cli.CliFrontend.modify(CliFrontend.java:790)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1068)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: java.util.concurrent.CompletionException:
java.lang.IllegalStateException: Suspend needs to happen atomically
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:975)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor.aroundReceive(Actor.scala:502)
at akka.actor.Actor.aroundReceive$(Actor.scala:500)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.IllegalStateException: Suspend needs to happen
atomically
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.suspend(ExecutionGraph.java:1172)
at
org.apache.flink.runtime.jobmaster.JobMaster.suspendExecutionGraph(JobMaster.java:1221)
at
org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$5(JobMaster.java:465)
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
... 20 more

On Fri, 7 Feb 2020 at 11:40, Stephen Connolly <
stephen.alan.connolly@gmail.com> wrote:

> So I am looking at the Flink Management REST API... and, as I see it,
> there are two paths to rescale a running topology:
>
> 1. Stop the topology with a savepoint and then start it up with the new
> savepoint; or
> 2. Use the /jobs/:jobid/rescaling
> <https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/rest_api.html#jobs-jobid-rescaling>
> endpoint
>
> The first one seems to work just fine.
>
> The second one seems to just blow up every time I try to use it... I'll
> get things like:
>
>
> https://gist.github.com/stephenc/0bbc08391ddce5a781242900e4b33a5d#file-log-txt
>
> The above was for the topology
> https://gist.github.com/stephenc/0bbc08391ddce5a781242900e4b33a5d#file-streamingjob-java
> running with options:
>
>     --source parallel
>
> Things are even worse with --source iterator as that has no checkpoint
> state to recover from
>
> Right now I am trying to discover what preconditions are required to be
> met in order to be able to safely call the Rescaling endpoint and actually
> have it work... I should note that I currently have not managed to get it
> to work at all!!!
>
> One of the things we are trying to do is add some automation to enable
> scale-up / down as we see surges in processing load. We want to have an
> automated system that can respond to those situations automatically for low
> deltas and trigger an on-call engineer for persistent excess load. In that
> regard I'd like to know what the automation should check to know whether it
> can do rescaling via the dedicated end-point or if it should use the
> reliable (but presumably slower) path of stop with savepoint & start from
> savepoint.
>
> The
> https://gist.github.com/stephenc/0bbc08391ddce5a781242900e4b33a5d#file-streamingjob-java
> job I have been using is just a quick job to let me test the automation on
> a local cluster. It is designed to output a strictly increasing sequence of
> numbers without missing any... optionally double them and then print them
> out. The different sources are me experimenting with different types of
> operator to see what kinds of topology can work with the rescaling end-point
>
> Thanks in advance
>