You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StefanRRichter <gi...@git.apache.org> on 2017/07/27 15:47:18 UTC

[GitHub] flink pull request #4413: [BACKPORT-1.3][FLINK-7268] [checkpoints] Scope Sha...

GitHub user StefanRRichter opened a pull request:

    https://github.com/apache/flink/pull/4413

    [BACKPORT-1.3][FLINK-7268] [checkpoints] Scope SharedStateRegistry objects

    See #4410 . This is a bugfix backport from 1.4 to 1.3.x .

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink IncrementalFix-1.3

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4413.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4413
    
----
commit b42ca4f2208298dec2031a7e8fb787fe62582faa
Author: Stefan Richter <s....@data-artisans.com>
Date:   2017-07-25T10:04:16Z

    [FLINK-7268] [checkpoints] Scope SharedStateRegistry objects per (re)start

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4413: [BACKPORT-1.3][FLINK-7268] [checkpoints] Scope Sha...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4413#discussion_r130074735
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java ---
    @@ -162,11 +161,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
     			stateStorage,
     			Executors.directExecutor());
     
    -		SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry());
    -		zooKeeperCompletedCheckpointStore.recover(sharedStateRegistry);
    -
    -		verify(retrievableStateHandle1.retrieveState(), times(1)).registerSharedStatesAfterRestored(sharedStateRegistry);
    --- End diff --
    
    Are these now tested in a different place? Or where already also tested somewhere else?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4413: [BACKPORT-1.3][FLINK-7268] [checkpoints] Scope SharedStat...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/4413
  
    Thanks a lot for the review @aljoscha ! Merging to 1.3 ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4413: [BACKPORT-1.3][FLINK-7268] [checkpoints] Scope Sha...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4413#discussion_r130075052
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java ---
    @@ -186,6 +187,77 @@ public void testSharedStateDeRegistration() throws Exception {
     		verify(stateHandle2.getMetaStateHandle(), times(1)).discardState();
     	}
     
    +	/**
    +	 * This tests that re-registration of shared state with another registry works as expected. This simulates a
    +	 * recovery from a checkpoint, when the checkpoint coordinator creates a new shared state registry and re-registers
    +	 * all live checkpoint states.
    +	 */
    +	@Test
    +	public void testSharedStateReRegistration() throws Exception {
    +
    +		SharedStateRegistry stateRegistryA = spy(new SharedStateRegistry());
    +
    +		// Create two state handles with overlapping shared state
    +		IncrementalKeyedStateHandle stateHandleX = create(new Random(1));
    +		IncrementalKeyedStateHandle stateHandleY = create(new Random(2));
    +		IncrementalKeyedStateHandle stateHandleZ = create(new Random(3));
    +
    +		// Now we register first time ...
    +		stateHandleX.registerSharedStates(stateRegistryA);
    +		stateHandleY.registerSharedStates(stateRegistryA);
    +		stateHandleZ.registerSharedStates(stateRegistryA);
    +
    +		try {
    +			// Second attempt should fail
    +			stateHandleX.registerSharedStates(stateRegistryA);
    +			fail("Should not be able to register twice with the same registry.");
    +		} catch (IllegalStateException ignore) {
    +		}
    +
    +		// Everything should be discarded for this handle
    +		stateHandleZ.discardState();
    +		verify(stateHandleZ.getMetaStateHandle(), times(1)).discardState();
    +		for (StreamStateHandle stateHandle : stateHandleZ.getSharedState().values()) {
    +			verify(stateHandle, times(1)).discardState();
    +		}
    +
    +		// Close the first registry
    +		stateRegistryA.close();
    +
    +		// Attempt to register to closed registry should trigger exception
    +		try {
    +			create(new Random(4)).registerSharedStates(stateRegistryA);
    +			fail("Should not be able to register new state to closed registry.");
    +		} catch (IllegalStateException ignore) {
    +		}
    +
    +		// All state should still get dicarded
    --- End diff --
    
    Nit: discarded


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4413: [BACKPORT-1.3][FLINK-7268] [checkpoints] Scope Sha...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4413#discussion_r130072157
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---
    @@ -1044,10 +1050,23 @@ public boolean restoreLatestCheckpointedState(
     				throw new IllegalStateException("CheckpointCoordinator is shut down");
     			}
     
    -			// Recover the checkpoints
    -			completedCheckpointStore.recover(sharedStateRegistry);
    +			// We create a new shared state registry object, so that all pending async disposal requests from previous
    +			// runs will go against the old object (were they can do nor harm). The old registry is first closed, so
    --- End diff --
    
    nit: `no harm` instead of `nor harm`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4413: [BACKPORT-1.3][FLINK-7268] [checkpoints] Scope SharedStat...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/4413
  
    CC @aljoscha 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4413: [BACKPORT-1.3][FLINK-7268] [checkpoints] Scope Sha...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4413#discussion_r130073387
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java ---
    @@ -202,10 +208,21 @@ public long getStateSize() {
     	@Override
     	public void registerSharedStates(SharedStateRegistry stateRegistry) {
     
    -		Preconditions.checkState(sharedStateRegistry == null, "The state handle has already registered its shared states.");
    +		// This is a quick check to avoid that we register twice with the same registry. However, the code allows to
    +		// register again with a different registry. The implication is that ownership is transferred to this new
    +		// registry. This should only happen in case of a restart, when the CheckpointCoordinator creates a new
    +		// SharedStateRegistry for the current attempt and the old registry becomes meaningless. We also assume that
    +		// an old registry object from a previous run is due to GC and will never be used for registration again.
    --- End diff --
    
    "is due to be GCed"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4413: [BACKPORT-1.3][FLINK-7268] [checkpoints] Scope Sha...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4413#discussion_r130075732
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java ---
    @@ -186,6 +187,77 @@ public void testSharedStateDeRegistration() throws Exception {
     		verify(stateHandle2.getMetaStateHandle(), times(1)).discardState();
     	}
     
    +	/**
    +	 * This tests that re-registration of shared state with another registry works as expected. This simulates a
    +	 * recovery from a checkpoint, when the checkpoint coordinator creates a new shared state registry and re-registers
    +	 * all live checkpoint states.
    +	 */
    +	@Test
    +	public void testSharedStateReRegistration() throws Exception {
    +
    +		SharedStateRegistry stateRegistryA = spy(new SharedStateRegistry());
    +
    +		// Create two state handles with overlapping shared state
    --- End diff --
    
    Comment is leftover from other test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4413: [BACKPORT-1.3][FLINK-7268] [checkpoints] Scope Sha...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter closed the pull request at:

    https://github.com/apache/flink/pull/4413


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4413: [BACKPORT-1.3][FLINK-7268] [checkpoints] Scope SharedStat...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/4413
  
    Forgot to mention, good to merge, modulo the nitpicks and the additional commit that introduces the delaying `Executor`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---