You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Matt Anger <ma...@li.me> on 2019/10/16 21:46:25 UTC
standalone flink savepoint restoration
Hello everyone,
I am running a flink job in k8s as a standalone HA job. Now I updated my
job w/ some additional sinks, which I guess have made the checkpoints
incompatible with the newer version, meaning flink now crashes on bootup
with the following:
Caused by: java.lang.IllegalStateException: There is no operator for the
state c9b81dfc309f1368ac7efb5864e7b693
So I rollback the deployment, log into the pod and create a savestate, and
then modify my args to add
--allowNonRestoredState
and
-s <savepoint-dir>
but it doesn't look like the standalone cluster is respecting those
arguments. I've tried searching around, but haven't found any solutions.
The docker image I have is running the docker-entrypoint.sh and the full
arg list is below as copy-pastad out of my k8s yaml file:
47 - job-cluster
48 - -Djobmanager.rpc.address=$(SERVICE_NAME)
49 - -Djobmanager.rpc.port=6123
50 - -Dresourcemanager.rpc.port=6123
51 - -Dparallelism.default=$(NUM_WORKERS)
52 - -Dblob.server.port=6124
53 - -Dqueryable-state.server.ports=6125
54 - -Ds3.access-key=$(AWS_ACCESS_KEY_ID)
55 - -Ds3.secret-key=$(AWS_SECRET_ACCESS_KEY)
56 - -Dhigh-availability=zookeeper
57 - -Dhigh-availability.jobmanager.port=50010
58 - -Dhigh-availability.storageDir=$(S3_HA)
59 - -Dhigh-availability.zookeeper.quorum=$(ZK_QUORUM)
60 - -Dstate.backend=filesystem
61 - -Dstate.checkpoints.dir=$(S3_CHECKPOINT)
62 - -Dstate.savepoints.dir=$(S3_SAVEPOINT)
63 - --allowNonRestoredState
64 - -s $(S3_SAVEPOINT)
I originally didn't have the last 2 args, I added them based upon various
emails I saw on this list and other google search results, to no avail.
Thanks
-Matt
Re: standalone flink savepoint restoration
Posted by Congxian Qiu <qc...@gmail.com>.
Hi
Do you specify the operatorid for all the operators?[1][2], asking this
because from the exception you gave, if you only add new operators and all
the old operators have specified operatorid, seems there would not throw
such exception.
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#assigning-operator-ids
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#should-i-assign-ids-to-all-operators-in-my-job
Best,
Congxian
Yun Tang <my...@live.com> 于2019年10月17日周四 下午12:31写道:
> Hi Matt
>
> Have you ever configured `high-availability.cluster-id` ? If not, Flink
> standalone job would first try to recover from high-availability checkpoint
> store named '/default'. If there existed a checkpoint, Flink would always
> restore from checkpoint disabling 'allowNonRestoredState'[1] (always
> passing 'false' in). Please consider to configure
> `high-availability.cluster-id` to different values to enable you could
> resume job with dropping some operators.
>
>
> [1]
> https://github.com/apache/flink/blob/7670e237d7d8d3727537c09b8695c860ea92d467/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java#L190
>
> Best
> Yun Tang
> ------------------------------
> *From:* Matt Anger <ma...@li.me>
> *Sent:* Thursday, October 17, 2019 5:46
> *To:* user@flink.apache.org <us...@flink.apache.org>
> *Subject:* standalone flink savepoint restoration
>
> Hello everyone,
> I am running a flink job in k8s as a standalone HA job. Now I updated my
> job w/ some additional sinks, which I guess have made the checkpoints
> incompatible with the newer version, meaning flink now crashes on bootup
> with the following:
> Caused by: java.lang.IllegalStateException: There is no operator for the
> state c9b81dfc309f1368ac7efb5864e7b693
>
> So I rollback the deployment, log into the pod and create a savestate, and
> then modify my args to add
>
> --allowNonRestoredState
> and
> -s <savepoint-dir>
>
> but it doesn't look like the standalone cluster is respecting those
> arguments. I've tried searching around, but haven't found any solutions.
> The docker image I have is running the docker-entrypoint.sh and the full
> arg list is below as copy-pastad out of my k8s yaml file:
>
> 47 - job-cluster
> 48 - -Djobmanager.rpc.address=$(SERVICE_NAME)
> 49 - -Djobmanager.rpc.port=6123
> 50 - -Dresourcemanager.rpc.port=6123
> 51 - -Dparallelism.default=$(NUM_WORKERS)
> 52 - -Dblob.server.port=6124
> 53 - -Dqueryable-state.server.ports=6125
> 54 - -Ds3.access-key=$(AWS_ACCESS_KEY_ID)
> 55 - -Ds3.secret-key=$(AWS_SECRET_ACCESS_KEY)
> 56 - -Dhigh-availability=zookeeper
> 57 - -Dhigh-availability.jobmanager.port=50010
> 58 - -Dhigh-availability.storageDir=$(S3_HA)
> 59 - -Dhigh-availability.zookeeper.quorum=$(ZK_QUORUM)
> 60 - -Dstate.backend=filesystem
> 61 - -Dstate.checkpoints.dir=$(S3_CHECKPOINT)
> 62 - -Dstate.savepoints.dir=$(S3_SAVEPOINT)
> 63 - --allowNonRestoredState
> 64 - -s $(S3_SAVEPOINT)
>
> I originally didn't have the last 2 args, I added them based upon various
> emails I saw on this list and other google search results, to no avail.
>
> Thanks
> -Matt
>
Re: standalone flink savepoint restoration
Posted by Yun Tang <my...@live.com>.
Hi Matt
Have you ever configured `high-availability.cluster-id` ? If not, Flink standalone job would first try to recover from high-availability checkpoint store named '/default'. If there existed a checkpoint, Flink would always restore from checkpoint disabling 'allowNonRestoredState'[1] (always passing 'false' in). Please consider to configure `high-availability.cluster-id` to different values to enable you could resume job with dropping some operators.
[1] https://github.com/apache/flink/blob/7670e237d7d8d3727537c09b8695c860ea92d467/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java#L190
Best
Yun Tang
________________________________
From: Matt Anger <ma...@li.me>
Sent: Thursday, October 17, 2019 5:46
To: user@flink.apache.org <us...@flink.apache.org>
Subject: standalone flink savepoint restoration
Hello everyone,
I am running a flink job in k8s as a standalone HA job. Now I updated my job w/ some additional sinks, which I guess have made the checkpoints incompatible with the newer version, meaning flink now crashes on bootup with the following:
Caused by: java.lang.IllegalStateException: There is no operator for the state c9b81dfc309f1368ac7efb5864e7b693
So I rollback the deployment, log into the pod and create a savestate, and then modify my args to add
--allowNonRestoredState
and
-s <savepoint-dir>
but it doesn't look like the standalone cluster is respecting those arguments. I've tried searching around, but haven't found any solutions. The docker image I have is running the docker-entrypoint.sh and the full arg list is below as copy-pastad out of my k8s yaml file:
47 - job-cluster
48 - -Djobmanager.rpc.address=$(SERVICE_NAME)
49 - -Djobmanager.rpc.port=6123
50 - -Dresourcemanager.rpc.port=6123
51 - -Dparallelism.default=$(NUM_WORKERS)
52 - -Dblob.server.port=6124
53 - -Dqueryable-state.server.ports=6125
54 - -Ds3.access-key=$(AWS_ACCESS_KEY_ID)
55 - -Ds3.secret-key=$(AWS_SECRET_ACCESS_KEY)
56 - -Dhigh-availability=zookeeper
57 - -Dhigh-availability.jobmanager.port=50010
58 - -Dhigh-availability.storageDir=$(S3_HA)
59 - -Dhigh-availability.zookeeper.quorum=$(ZK_QUORUM)
60 - -Dstate.backend=filesystem
61 - -Dstate.checkpoints.dir=$(S3_CHECKPOINT)
62 - -Dstate.savepoints.dir=$(S3_SAVEPOINT)
63 - --allowNonRestoredState
64 - -s $(S3_SAVEPOINT)
I originally didn't have the last 2 args, I added them based upon various emails I saw on this list and other google search results, to no avail.
Thanks
-Matt