You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/04/25 16:34:04 UTC

[jira] [Commented] (FLINK-5892) Recover job state at the granularity of operator

    [ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15983195#comment-15983195 ] 

ASF GitHub Bot commented on FLINK-5892:
---------------------------------------

GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/3770

    [FLINK-5892] Restore state on the operator level

    ## General
    This PR is a collaboration between @guoweiM and myself, enabling Flink to restore state on the operator level. This means that the topology of a job may change in regards to chains when restoring from a 1.3 savepoint, allowing the arbitrary addition, removal or modification of chains.
    
    The cornerstone for this is a semantic change for savepoints, no structural changes have been made to the `SavepointV0/1/2` classes or their serialized format:
    
    In 1.2 a savepoint contains the states of tasks. If a task consists of multiple operators then the stored TaskState internally contains a list of states, one entry for each operator.
    
    In 1.3 a savepoint contains the states of operators only; the notion of tasks is eliminated. If a task consists of multiple operators we store one TaskState for each operator instead. Internally they each contain a list of states with a length of 1.
    
    ## Implementation
    
    In order for this to work a number of changes had to be made.
    
    First and foremost we required a new `StateAssignmentOperation` that was aware of operators.
    (74881a2, 8be9c58, 4fa8bbd)
    
    Since the SAO uses the `ExecutionGraph` classes to map the restored state it was necessary to forward the IDs of all contained operators from the `StreamingJobGraphGenerator` to the `ExecutionJobVertex`.
    (73427c3)
    
    The `PendingCheckpoint` class had to be adjusted to conform to the new semantics; received `SubtaskStates`, containing the state of a task, are broken down into SubtaskStates for the individual operators.
    (f7b8ef9)
    
    ## Tests
    
    The majority of this PR are new tests (60% or so).
    
    A number of tests were added under flink-tests that test the migration path from 1.2 to 1.3.
    (d1efdb1)
    
    These tests first restore a job from a 1.2 savepoint, without changes to the topology, verify that the state was restored correctly and finally create a new savepoint. They then restore from this migrated 1.3 savepoint, with changes to the topology for varying scenarios, and verify the correct restoration of state again.
    
    A new test was also added to the `CheckpointCoordinatorTest` which tests the support for topology changes without executing a job.
    (8b5430f9)
    
    A number of existing tests had to be tweaked to run with the new changes, but these changes all boil down to extending existing mocks by a method or two.
    (b5430f9)
    
    ## Other changes
    
    To make it more obvious that we deal with operators and not tasks a new `OperatorID` class was introduced, and usages of `JobVertexID` in savepoint-related parts were replaced when appropriate.
    (fe74023)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink 5982_operator_state

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3770.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3770
    
----
commit abe1bb9416b2a4159a3667d6845a4a9776abdc4f
Author: zentol <ch...@apache.org>
Date:   2017-04-03T15:39:50Z

    [prerequisite] Disable exception when assigning uid on chained operator

commit 74881a2788d034db67d99d6d32dbb2cf923aed53
Author: zentol <ch...@apache.org>
Date:   2017-04-04T10:53:56Z

    [internal] Adjust SavepointLoader to new Savepoint semantics

commit f7b8ef943097cd994a4ef3d5594fea4027720f5a
Author: zentol <ch...@apache.org>
Date:   2017-04-04T13:02:55Z

    [internal] adjust PendingCheckpoint to be in line with new semantics

commit 73427c3fc7d69f5d072d9d8a4809d449e0c5bdac
Author: zentol <ch...@apache.org>
Date:   2017-04-04T11:33:54Z

    [internal] Get operator ID's into ExecutionGraph

commit 465805792932cb888393d9257fdefd828fa59343
Author: zentol <ch...@apache.org>
Date:   2017-04-25T16:07:16Z

    [internals] Extract several utility methods from StateAssignmentOperation

commit 008e848715b7091c3deabc9251d9d673f5506e64
Author: guowei.mgw <gu...@gmail.com>
Date:   2017-04-24T09:47:47Z

    [internal] Add new StateAssignmentOperation

commit ffb93298ce90956b9886b3526258f6a814b7e0af
Author: zentol <ch...@apache.org>
Date:   2017-04-04T13:01:07Z

    [internal] Integrate new StateAssignmentOperation version

commit d1efdb1c34d59f04147292b320528cd2bc838244
Author: zentol <ch...@apache.org>
Date:   2017-04-03T15:40:21Z

    [tests] Add tests for chain modifications

commit 8b45b5a77f2cc499fdbb41d8198ac0a2e25bb1d7
Author: zentol <ch...@apache.org>
Date:   2017-04-24T11:58:07Z

    [tests] Adjust existing tests

commit b5430f98bfbb56e49f9a8b21fe5b1e5dd7358714
Author: guowei.mgw <gu...@gmail.com>
Date:   2017-04-24T10:13:44Z

    [tests] Add tests for topology modifications

commit fe7402358a89c37bd470437f9c3f05d7ff3d3ca1
Author: zentol <ch...@apache.org>
Date:   2017-04-25T14:08:07Z

    [internal] Introduce OperatorID for state business

----


> Recover job state at the granularity of operator
> ------------------------------------------------
>
>                 Key: FLINK-5892
>                 URL: https://issues.apache.org/jira/browse/FLINK-5892
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>            Reporter: Guowei Ma
>            Assignee: Guowei Ma
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the granularity of task.
> This leads to the result that the operator of the job may not recover the state from a save point even if the save point has the state of operator. 
>  https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)