You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StefanRRichter <gi...@git.apache.org> on 2017/07/27 15:40:56 UTC

[GitHub] flink pull request #4410: [FLINK-7268] [checkpoints] Scope SharedStateRegist...

GitHub user StefanRRichter opened a pull request:

    https://github.com/apache/flink/pull/4410

    [FLINK-7268] [checkpoints] Scope SharedStateRegistry objects per (re)start

    ## What is the purpose of the change
    
    This PR fixes FLINK-7268. The problem was that `ZookeeperCompletedCheckpointStore` deletes checkpoints asynchronously. When this happens parallel to a restart, it could happen that the async delete performed shared state de-registration.
    
    Before this PR, the old `SharedStateRegistry` was kept after restart and the counts where updated from the completed checkpoint store. In the described race, a checkpoint that has a pending delete will not contribute to the new count, but it can still decrement the count once the shared state is unregistered in the async deletion thread. This can accidentally drop counts below 1 and lead to data loss.
    
    The core idea behind the PR is to scope the `SharedStateRegistry` per (re-)start, so that old pending deletes cannot influence the current count.
    
    `SharedStateRegistry` is now created via a factory that is passed into the `CheckpointCoordinator` to simplify testing.
    
    The PR also introduces additional tests and improves the debug/trace logging of incremental checkpointing.
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    
    Run a job with keyed state, using incremental checkpoints and HA mode. Kill TMs to trigger recovery. After a couple of attemts, the problematic condition should be triggered, leading to an infinite recovery loop due to `FileNotFoundException`.
    
    Additional tests:
    
    `HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase`
    `CheckpointCoordinatortest::testSharedStateRegistrationOnRestore``
    `IncrementalKeyedStateHandleTest::testSharedStateReRegistration`
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (YES)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink ImprovedIncreemntalCPDebug

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4410.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4410
    
----
commit 2ed4f6b28c2fda674f1319f2a3678b2a231988ac
Author: Stefan Richter <s....@data-artisans.com>
Date:   2017-06-26T16:07:59Z

    [FLINK-7213] Introduce state management by OperatorID in TaskManager

commit a50eda8602d2034753b42413d23842a888e73611
Author: Stefan Richter <s....@data-artisans.com>
Date:   2017-07-11T15:10:03Z

    [FLINK-7213] Introduce TaskStateSnapshot to unify TaskStateHandles and SubtaskState

commit bce928fcfec73ff7584840ae7eb6b31fb727604f
Author: Stefan Richter <s....@data-artisans.com>
Date:   2017-07-25T10:14:03Z

    review comments zentol

commit 363f0ee18e06affe95c30095ed229ca8dfd47801
Author: Stefan Richter <s....@data-artisans.com>
Date:   2017-07-26T11:31:30Z

    review comments zentol part 2

commit 98e657ea02c17d972391dd2360c287a00f27231e
Author: Stefan Richter <s....@data-artisans.com>
Date:   2017-07-26T11:31:47Z

    review comments zentol part 2

commit 4460d3412f3ffc2e5496b65949751a31dae8a01a
Author: Stefan Richter <s....@data-artisans.com>
Date:   2017-07-25T10:04:16Z

    [FLINK-7268] [checkpoints] Scope SharedStateRegistry objects per (re)start

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4410: [FLINK-7268] [checkpoints] Scope SharedStateRegistry obje...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/4410
  
    CC @aljoscha 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4410: [FLINK-7268] [checkpoints] Scope SharedStateRegist...

Posted by tedyu <gi...@git.apache.org>.
Github user tedyu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4410#discussion_r134647310
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---
    @@ -1044,10 +1049,23 @@ public boolean restoreLatestCheckpointedState(
     				throw new IllegalStateException("CheckpointCoordinator is shut down");
     			}
     
    -			// Recover the checkpoints
    -			completedCheckpointStore.recover(sharedStateRegistry);
    +			// We create a new shared state registry object, so that all pending async disposal requests from previous
    +			// runs will go against the old object (were they can do no harm).
    +			// This must happen under the checkpoint lock.
    +			sharedStateRegistry.close();
    +			sharedStateRegistry = sharedStateRegistryFactory.create(executor);
    +
    +			// Recover the checkpoints, TODO this could be done only when there is a new leader, not on each recovery
    --- End diff --
    
    If we use highAvailabilityServices.getJobManagerLeaderRetriever(), Job Id is required.
    Can Job Id be obtained from JobVertexID ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4410: [FLINK-7268] [checkpoints] Scope SharedStateRegist...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4410


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4410: [FLINK-7268] [checkpoints] Scope SharedStateRegistry obje...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/4410
  
    I also reviewed `CheckpointCoordinatorTest.testSharedStateRegistrationOnRestore()`. Looks good 👍 The rest is the same as in #4410 so let's wait until #4353 is merged and then we can merge this one as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---