You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/04/25 16:34:04 UTC
[jira] [Commented] (FLINK-5892) Recover job state at the
granularity of operator
[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15983195#comment-15983195 ]
ASF GitHub Bot commented on FLINK-5892:
---------------------------------------
GitHub user zentol opened a pull request:
https://github.com/apache/flink/pull/3770
[FLINK-5892] Restore state on the operator level
## General
This PR is a collaboration between @guoweiM and myself, enabling Flink to restore state on the operator level. This means that the topology of a job may change in regards to chains when restoring from a 1.3 savepoint, allowing the arbitrary addition, removal or modification of chains.
The cornerstone for this is a semantic change for savepoints, no structural changes have been made to the `SavepointV0/1/2` classes or their serialized format:
In 1.2 a savepoint contains the states of tasks. If a task consists of multiple operators then the stored TaskState internally contains a list of states, one entry for each operator.
In 1.3 a savepoint contains the states of operators only; the notion of tasks is eliminated. If a task consists of multiple operators we store one TaskState for each operator instead. Internally they each contain a list of states with a length of 1.
## Implementation
In order for this to work a number of changes had to be made.
First and foremost we required a new `StateAssignmentOperation` that was aware of operators.
(74881a2, 8be9c58, 4fa8bbd)
Since the SAO uses the `ExecutionGraph` classes to map the restored state it was necessary to forward the IDs of all contained operators from the `StreamingJobGraphGenerator` to the `ExecutionJobVertex`.
(73427c3)
The `PendingCheckpoint` class had to be adjusted to conform to the new semantics; received `SubtaskStates`, containing the state of a task, are broken down into SubtaskStates for the individual operators.
(f7b8ef9)
## Tests
The majority of this PR are new tests (60% or so).
A number of tests were added under flink-tests that test the migration path from 1.2 to 1.3.
(d1efdb1)
These tests first restore a job from a 1.2 savepoint, without changes to the topology, verify that the state was restored correctly and finally create a new savepoint. They then restore from this migrated 1.3 savepoint, with changes to the topology for varying scenarios, and verify the correct restoration of state again.
A new test was also added to the `CheckpointCoordinatorTest` which tests the support for topology changes without executing a job.
(8b5430f9)
A number of existing tests had to be tweaked to run with the new changes, but these changes all boil down to extending existing mocks by a method or two.
(b5430f9)
## Other changes
To make it more obvious that we deal with operators and not tasks a new `OperatorID` class was introduced, and usages of `JobVertexID` in savepoint-related parts were replaced when appropriate.
(fe74023)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/zentol/flink 5982_operator_state
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/3770.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3770
----
commit abe1bb9416b2a4159a3667d6845a4a9776abdc4f
Author: zentol <ch...@apache.org>
Date: 2017-04-03T15:39:50Z
[prerequisite] Disable exception when assigning uid on chained operator
commit 74881a2788d034db67d99d6d32dbb2cf923aed53
Author: zentol <ch...@apache.org>
Date: 2017-04-04T10:53:56Z
[internal] Adjust SavepointLoader to new Savepoint semantics
commit f7b8ef943097cd994a4ef3d5594fea4027720f5a
Author: zentol <ch...@apache.org>
Date: 2017-04-04T13:02:55Z
[internal] adjust PendingCheckpoint to be in line with new semantics
commit 73427c3fc7d69f5d072d9d8a4809d449e0c5bdac
Author: zentol <ch...@apache.org>
Date: 2017-04-04T11:33:54Z
[internal] Get operator ID's into ExecutionGraph
commit 465805792932cb888393d9257fdefd828fa59343
Author: zentol <ch...@apache.org>
Date: 2017-04-25T16:07:16Z
[internals] Extract several utility methods from StateAssignmentOperation
commit 008e848715b7091c3deabc9251d9d673f5506e64
Author: guowei.mgw <gu...@gmail.com>
Date: 2017-04-24T09:47:47Z
[internal] Add new StateAssignmentOperation
commit ffb93298ce90956b9886b3526258f6a814b7e0af
Author: zentol <ch...@apache.org>
Date: 2017-04-04T13:01:07Z
[internal] Integrate new StateAssignmentOperation version
commit d1efdb1c34d59f04147292b320528cd2bc838244
Author: zentol <ch...@apache.org>
Date: 2017-04-03T15:40:21Z
[tests] Add tests for chain modifications
commit 8b45b5a77f2cc499fdbb41d8198ac0a2e25bb1d7
Author: zentol <ch...@apache.org>
Date: 2017-04-24T11:58:07Z
[tests] Adjust existing tests
commit b5430f98bfbb56e49f9a8b21fe5b1e5dd7358714
Author: guowei.mgw <gu...@gmail.com>
Date: 2017-04-24T10:13:44Z
[tests] Add tests for topology modifications
commit fe7402358a89c37bd470437f9c3f05d7ff3d3ca1
Author: zentol <ch...@apache.org>
Date: 2017-04-25T14:08:07Z
[internal] Introduce OperatorID for state business
----
> Recover job state at the granularity of operator
> ------------------------------------------------
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
> Issue Type: New Feature
> Components: State Backends, Checkpointing
> Reporter: Guowei Ma
> Assignee: Guowei Ma
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the granularity of task.
> This leads to the result that the operator of the job may not recover the state from a save point even if the save point has the state of operator.
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)