You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/02/15 06:51:33 UTC

[GitHub] Myasuka opened a new pull request #7711: [FLINK-11618][state] Refactor operator state repartition mechanism

Myasuka opened a new pull request #7711: [FLINK-11618][state] Refactor operator state repartition mechanism
URL: https://github.com/apache/flink/pull/7711
 
 
   ## What is the purpose of the change
   There existed two problems during state assignment when parallelism not changed:
   
   1.  If we only have even-split redistributed state, current implementation actually cannot ensure state assignment to keep as the same as previously. This is because current `StateAssignmentOperation#collectPartitionableStates` would repartition managedOperatorStates without subtask-index information. Take an example, if we have a operator-state with parallelism as 2, and subtask-0's managed-operatorstate is empty while subtask-1 not. Although new parallelism still keeps as 2, after `StateAssignmentOperation#collectPartitionableStates` and state assigned, subtask-0 would be assigned the managed-operatorstate while subtask-1 got none.
   1.  We should only redistribute union state and not touch the even-split state. Redistribute even-split state would cause unexpected behavior after `RestartPipelinedRegionStrategy` supported to restore state.
   
   This PR will fix the above two problems.
   
   ## Brief change log
   
     - Refactor `StateAssignmentOperation#collectPartitionableStates` to let returned `managedOperatorStates` has subtask-index information.
     - Refactor API of `OperatorStateRepartitioner` and its implementation `RoundRobinOperatorStateRepartitioner`. The logical of comparing old and new parallelism is now located in `OperatorStateRepartitioner#repartitionState`.
     - Refactor `AbstractStreamOperatorTestHarness` to extract the logic of  repartitioning operator state into a new method `AbstractStreamOperatorTestHarness#repartitionOperatorState` instead of previously contained in `AbstractStreamOperatorTestHarness#initializeState`.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - Added `StateAssignmentOperationTest` to verify the logic of `StateAssignmentOperation#reDistributePartitionableStates`.
     - Existed all tests using `AbstractStreamOperatorTestHarness` could verify the refactor of `StateAssignmentOperation` generate the expected results as before.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: yes, this could affect state assignment when restoring checkpoint
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not applicable, this is a internal implementation, current doc should be enough.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services