You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by sihuazhou <gi...@git.apache.org> on 2018/02/26 10:33:52 UTC

[GitHub] flink pull request #5578: Improve resource release for local recovery

GitHub user sihuazhou opened a pull request:

    https://github.com/apache/flink/pull/5578

    Improve resource release for local recovery

    ## What is the purpose of the change
    
    This PR fixes [FLINK-8777](https://issues.apache.org/jira/browse/FLINK-8777). When recovery from failed, `TaskLocalStateStoreImpl.retrieveLocalState()` will be invoked, we can release all entry from `storedTaskStateByCheckpointID`  that does not satisfy `entry.checkpointID == checkpointID`, this can prevent the resource leak when job loop in `local checkpoint completed => failed => local checkpoint completed => failed ...`.
    
    ## Brief change log
    
      - release resource in retrieveLocalState
      - change the type of toDiscard from Map to a single entry.
    
    ## Verifying this change
    
    This changes can be verified by the exists tests.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sihuazhou/flink improve_resource_release_for_local_recovery

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5578.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5578
    
----
commit 223aa37e625e81e73ccb5543fedd99ea72723ead
Author: sihuazhou <su...@...>
Date:   2018-02-26T02:46:51Z

    release local-checkpoint-data in retrieveLocalState.

commit 5373b394aaac8ae5010a251ae80d3a3b6ee61dc3
Author: sihuazhou <su...@...>
Date:   2018-02-26T02:54:53Z

    change the type of toDiscard from Map to a single entry.

----


---

[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5578#discussion_r171130718
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java ---
    @@ -90,6 +92,9 @@
     	@GuardedBy("lock")
     	private boolean disposed;
     
    +	/** Whether to discard the useless state when retrieve local checkpoint state. */
    +	private boolean retrieveWithDiscard = true;
    --- End diff --
    
    Aha, this is just for passing the existing test case in `TaskLocalStateStoreImplTest` ... 
    ```java
    	private void checkStoredAsExpected(List<TaskStateSnapshot> history, int off, int len) throws Exception {
    		for (int i = off; i < len; ++i) {
    			TaskStateSnapshot expected = history.get(i);
    			Assert.assertTrue(expected == taskLocalStateStore.retrieveLocalState(i));
    			Mockito.verify(expected, Mockito.never()).discardState();
    		}
    	}
    ```


---

[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5578#discussion_r170965652
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java ---
    @@ -90,6 +92,10 @@
     	@GuardedBy("lock")
     	private boolean disposed;
     
    +	/** Whether to discard the useless state when retrieve local checkpoint state. */
    +	@Nonnull
    --- End diff --
    
    addressed.


---

[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5578#discussion_r170965699
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java ---
    @@ -285,4 +316,9 @@ public String toString() {
     			", localRecoveryConfig=" + localRecoveryConfig +
     			'}';
     	}
    +
    +	@VisibleForTesting
    +	void setRetrieveWithDiscard(@Nonnull boolean retrieveWithDiscard) {
    --- End diff --
    
    addressed.


---

[GitHub] flink issue #5578: [FLINK-8777][state]Improve resource release for local rec...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/5578
  
    Ok, I took another look at the complete picture and from the test got the feeling that retrieval and pruning should be two separated concerns and that not only should we have two internal methods, but maybe also expose them as different methods. For the sake to keep this short, I made a proposal in this branch:
    
    https://github.com/StefanRRichter/flink/tree/improve_resource_release_for_local_recovery
    
    If you like the change, I would squash it and commit under your name, because you did all of the important parts. What do you think?


---

[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5578#discussion_r170953600
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java ---
    @@ -126,30 +133,54 @@ public void storeLocalState(
     		LOG.info("Storing local state for checkpoint {}.", checkpointId);
     		LOG.debug("Local state for checkpoint {} is {}.", checkpointId, localState);
     
    -		Map<Long, TaskStateSnapshot> toDiscard = new HashMap<>(16);
    +		Map.Entry<Long, TaskStateSnapshot> toDiscard = null;
     
     		synchronized (lock) {
     			if (disposed) {
     				// we ignore late stores and simply discard the state.
    -				toDiscard.put(checkpointId, localState);
    +				toDiscard = new AbstractMap.SimpleEntry<Long, TaskStateSnapshot>(checkpointId, localState);
     			} else {
     				TaskStateSnapshot previous =
     					storedTaskStateByCheckpointID.put(checkpointId, localState);
     
     				if (previous != null) {
    -					toDiscard.put(checkpointId, previous);
    +					toDiscard = new AbstractMap.SimpleEntry<Long, TaskStateSnapshot>(checkpointId, previous);
     				}
     			}
     		}
     
    -		asyncDiscardLocalStateForCollection(toDiscard.entrySet());
    +		if (toDiscard != null) {
    +			asyncDiscardLocalStateForCollection(Collections.singletonList(toDiscard));
    +		}
     	}
     
     	@Override
     	@Nullable
     	public TaskStateSnapshot retrieveLocalState(long checkpointID) {
     		synchronized (lock) {
     			TaskStateSnapshot snapshot = storedTaskStateByCheckpointID.get(checkpointID);
    +
    +			Iterator<Map.Entry<Long, TaskStateSnapshot>> entryIterator =
    +				storedTaskStateByCheckpointID.entrySet().iterator();
    +
    +			if (retrieveWithDiscard) {
    +				// Only the TaskStateSnapshot.checkpointID == checkpointID is useful, we remove the others
    +				final List<Map.Entry<Long, TaskStateSnapshot>> toRemove = new ArrayList<>();
    --- End diff --
    
    I think we can de-duplicate code by extracting a method with the code from `confirmCheckpoint(...)`, maybe called `pruneCheckpoints(...)`. We can do the comparison for both use-cases as `entryCheckpointId != checkpointID` and have a boolean parameter which determines if we break the iteration in the `else` case or not.


---

[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5578#discussion_r170982734
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java ---
    @@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws IOException {
     		}
     	}
     
    +	/**
    +	 * Pruning the useless checkpoints.
    +	 */
    +	private void pruneCheckpoints(long checkpointID, boolean breakTheIteration) {
    +
    +		Iterator<Map.Entry<Long, TaskStateSnapshot>> entryIterator =
    +			storedTaskStateByCheckpointID.entrySet().iterator();
    +
    +		final List<Map.Entry<Long, TaskStateSnapshot>> toRemove = new ArrayList<>();
    +
    +		while (entryIterator.hasNext()) {
    +
    +			Map.Entry<Long, TaskStateSnapshot> snapshotEntry = entryIterator.next();
    +			long entryCheckpointId = snapshotEntry.getKey();
    +
    +			if (entryCheckpointId != checkpointID) {
    --- End diff --
    
    After a second though, while I think this code is currently correct, the case with breaking looks a bit dangerous. Potentially, if the checkpoint id is not there, this would not stop and prune ongoing checkpoints. I wonder if we should make the `if` a bit more complex, but safer (checking that the breaking case never exceeds the checkpoint id). What do you think?


---

[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5578#discussion_r171130767
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java ---
    @@ -159,6 +166,11 @@ public TaskStateSnapshot retrieveLocalState(long checkpointID) {
     		TaskStateSnapshot snapshot;
     		synchronized (lock) {
     			snapshot = storedTaskStateByCheckpointID.get(checkpointID);
    +
    +			if (retrieveWithDiscard) {
    +				// Only the TaskStateSnapshot.checkpointID == checkpointID is useful, we remove the others
    --- End diff --
    
    👍 


---

[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5578#discussion_r171178975
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java ---
    @@ -300,6 +295,34 @@ private void deleteDirectory(File directory) throws IOException {
     		}
     	}
     
    +	/**
    +	 * Pruning the useless checkpoints, it should be called only when holding the {@link #lock}.
    +	 */
    +	private void pruneCheckpoints(Predicate<Long> pruningChecker, boolean breakOnceCheckerFalse) {
    --- End diff --
    
    👍 I prefer to use `LongPredicate`, addressing ...


---

[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5578#discussion_r171179946
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java ---
    @@ -90,6 +92,9 @@
     	@GuardedBy("lock")
     	private boolean disposed;
     
    +	/** Whether to discard the useless state when retrieve local checkpoint state. */
    +	private boolean retrieveWithDiscard = true;
    --- End diff --
    
    Then there are two better options in my opinion, because the flag is pure boilerplate:
    
    - Change the test to check what we are doing now, because that is what happens in the real use-case.
    - Maybe even better: split the method `retrieveLocalState` further: one method for pruning, one package-private method that does all the pure retrieval, logging, and `null` transformation. In the old `retrieveLocalState`, do the cleanup first, then the pure retrieval/logging. Call the package private method in the test.
    
    Maybe the test should then also just do both?


---

[GitHub] flink issue #5578: Improve resource release for local recovery

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5578
  
    @StefanRRichter Could you please have a look at this?


---

[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5578#discussion_r170975985
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java ---
    @@ -90,6 +92,9 @@
     	@GuardedBy("lock")
     	private boolean disposed;
     
    +	/** Whether to discard the useless state when retrieve local checkpoint state. */
    +	private boolean retrieveWithDiscard = true;
    --- End diff --
    
    Why do we need this? Is there any case for not doing the cleanup?


---

[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5578#discussion_r171177186
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java ---
    @@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws IOException {
     		}
     	}
     
    +	/**
    +	 * Pruning the useless checkpoints.
    +	 */
    +	private void pruneCheckpoints(long checkpointID, boolean breakTheIteration) {
    +
    +		Iterator<Map.Entry<Long, TaskStateSnapshot>> entryIterator =
    +			storedTaskStateByCheckpointID.entrySet().iterator();
    +
    +		final List<Map.Entry<Long, TaskStateSnapshot>> toRemove = new ArrayList<>();
    +
    +		while (entryIterator.hasNext()) {
    +
    +			Map.Entry<Long, TaskStateSnapshot> snapshotEntry = entryIterator.next();
    +			long entryCheckpointId = snapshotEntry.getKey();
    +
    +			if (entryCheckpointId != checkpointID) {
    --- End diff --
    
    That is fine, from my point of view that is just one way of making the `if` more complex.


---

[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5578


---

[GitHub] flink issue #5578: [FLINK-8777][state]Improve resource release for local rec...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5578
  
    @StefanRRichter Thanks, I like it! It looks very good.


---

[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5578#discussion_r170975679
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java ---
    @@ -159,6 +166,11 @@ public TaskStateSnapshot retrieveLocalState(long checkpointID) {
     		TaskStateSnapshot snapshot;
     		synchronized (lock) {
     			snapshot = storedTaskStateByCheckpointID.get(checkpointID);
    +
    +			if (retrieveWithDiscard) {
    +				// Only the TaskStateSnapshot.checkpointID == checkpointID is useful, we remove the others
    --- End diff --
    
    Comment is no longer required.


---

[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5578#discussion_r171181196
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java ---
    @@ -90,6 +92,9 @@
     	@GuardedBy("lock")
     	private boolean disposed;
     
    +	/** Whether to discard the useless state when retrieve local checkpoint state. */
    +	private boolean retrieveWithDiscard = true;
    --- End diff --
    
    Agreed and I prefer the second option.


---

[GitHub] flink issue #5578: [FLINK-8777][state]Improve resource release for local rec...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/5578
  
    Thanks, I will merge this once the final points are addressed :-)


---

[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5578#discussion_r170965807
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java ---
    @@ -90,6 +92,10 @@
     	@GuardedBy("lock")
     	private boolean disposed;
     
    +	/** Whether to discard the useless state when retrieve local checkpoint state. */
    +	@Nonnull
    +	private boolean retrieveWithDiscard;
    --- End diff --
    
    addressed.


---

[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5578#discussion_r171177343
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java ---
    @@ -300,6 +295,34 @@ private void deleteDirectory(File directory) throws IOException {
     		}
     	}
     
    +	/**
    +	 * Pruning the useless checkpoints, it should be called only when holding the {@link #lock}.
    +	 */
    +	private void pruneCheckpoints(Predicate<Long> pruningChecker, boolean breakOnceCheckerFalse) {
    --- End diff --
    
    We can use `LongPredicate` instead of `Predicate<Long>`


---

[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5578#discussion_r170947390
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java ---
    @@ -126,30 +133,54 @@ public void storeLocalState(
     		LOG.info("Storing local state for checkpoint {}.", checkpointId);
     		LOG.debug("Local state for checkpoint {} is {}.", checkpointId, localState);
     
    -		Map<Long, TaskStateSnapshot> toDiscard = new HashMap<>(16);
    +		Map.Entry<Long, TaskStateSnapshot> toDiscard = null;
     
     		synchronized (lock) {
     			if (disposed) {
     				// we ignore late stores and simply discard the state.
    -				toDiscard.put(checkpointId, localState);
    +				toDiscard = new AbstractMap.SimpleEntry<Long, TaskStateSnapshot>(checkpointId, localState);
     			} else {
     				TaskStateSnapshot previous =
     					storedTaskStateByCheckpointID.put(checkpointId, localState);
     
     				if (previous != null) {
    -					toDiscard.put(checkpointId, previous);
    +					toDiscard = new AbstractMap.SimpleEntry<Long, TaskStateSnapshot>(checkpointId, previous);
     				}
     			}
     		}
     
    -		asyncDiscardLocalStateForCollection(toDiscard.entrySet());
    +		if (toDiscard != null) {
    +			asyncDiscardLocalStateForCollection(Collections.singletonList(toDiscard));
    +		}
     	}
     
     	@Override
     	@Nullable
     	public TaskStateSnapshot retrieveLocalState(long checkpointID) {
     		synchronized (lock) {
     			TaskStateSnapshot snapshot = storedTaskStateByCheckpointID.get(checkpointID);
    +
    +			Iterator<Map.Entry<Long, TaskStateSnapshot>> entryIterator =
    --- End diff --
    
    I would move all the cleanup logic in a separate method that is just invoked here to separate the concerns.


---

[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5578#discussion_r170965755
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java ---
    @@ -126,30 +133,54 @@ public void storeLocalState(
     		LOG.info("Storing local state for checkpoint {}.", checkpointId);
     		LOG.debug("Local state for checkpoint {} is {}.", checkpointId, localState);
     
    -		Map<Long, TaskStateSnapshot> toDiscard = new HashMap<>(16);
    +		Map.Entry<Long, TaskStateSnapshot> toDiscard = null;
     
     		synchronized (lock) {
     			if (disposed) {
     				// we ignore late stores and simply discard the state.
    -				toDiscard.put(checkpointId, localState);
    +				toDiscard = new AbstractMap.SimpleEntry<Long, TaskStateSnapshot>(checkpointId, localState);
     			} else {
     				TaskStateSnapshot previous =
     					storedTaskStateByCheckpointID.put(checkpointId, localState);
     
     				if (previous != null) {
    -					toDiscard.put(checkpointId, previous);
    +					toDiscard = new AbstractMap.SimpleEntry<Long, TaskStateSnapshot>(checkpointId, previous);
     				}
     			}
     		}
     
    -		asyncDiscardLocalStateForCollection(toDiscard.entrySet());
    +		if (toDiscard != null) {
    +			asyncDiscardLocalStateForCollection(Collections.singletonList(toDiscard));
    +		}
     	}
     
     	@Override
     	@Nullable
     	public TaskStateSnapshot retrieveLocalState(long checkpointID) {
     		synchronized (lock) {
     			TaskStateSnapshot snapshot = storedTaskStateByCheckpointID.get(checkpointID);
    +
    +			Iterator<Map.Entry<Long, TaskStateSnapshot>> entryIterator =
    +				storedTaskStateByCheckpointID.entrySet().iterator();
    +
    +			if (retrieveWithDiscard) {
    +				// Only the TaskStateSnapshot.checkpointID == checkpointID is useful, we remove the others
    +				final List<Map.Entry<Long, TaskStateSnapshot>> toRemove = new ArrayList<>();
    --- End diff --
    
    👍 addressed.


---

[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5578#discussion_r170948561
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java ---
    @@ -285,4 +316,9 @@ public String toString() {
     			", localRecoveryConfig=" + localRecoveryConfig +
     			'}';
     	}
    +
    +	@VisibleForTesting
    +	void setRetrieveWithDiscard(@Nonnull boolean retrieveWithDiscard) {
    --- End diff --
    
    remove `@Nonnull`


---

[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5578#discussion_r171130750
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java ---
    @@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws IOException {
     		}
     	}
     
    +	/**
    +	 * Pruning the useless checkpoints.
    +	 */
    +	private void pruneCheckpoints(long checkpointID, boolean breakTheIteration) {
    +
    --- End diff --
    
    👍 


---

[GitHub] flink issue #5578: [FLINK-8777][state]Improve resource release for local rec...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5578
  
    @StefanRRichter I have addressed your suggestions, except the one that to make the `if` a bit complex, instead I introduced a `Predicate` for the `pruneCheckpoints()`. I not sure whether it is ok to you, if you still against doing so, I'd like to change the code as to make the `if` a bit complex.


---

[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5578#discussion_r170975600
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java ---
    @@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws IOException {
     		}
     	}
     
    +	/**
    +	 * Pruning the useless checkpoints.
    +	 */
    +	private void pruneCheckpoints(long checkpointID, boolean breakTheIteration) {
    +
    --- End diff --
    
    I suggest to add an assert that the thread holds `lock` and document that this method should be called only when holding the lock.


---

[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5578#discussion_r171133066
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java ---
    @@ -300,6 +291,32 @@ private void deleteDirectory(File directory) throws IOException {
     		}
     	}
     
    +	/**
    +	 * Pruning the useless checkpoints.
    +	 */
    +	private void pruneCheckpoints(long checkpointID, boolean breakTheIteration) {
    +
    +		Iterator<Map.Entry<Long, TaskStateSnapshot>> entryIterator =
    +			storedTaskStateByCheckpointID.entrySet().iterator();
    +
    +		final List<Map.Entry<Long, TaskStateSnapshot>> toRemove = new ArrayList<>();
    +
    +		while (entryIterator.hasNext()) {
    +
    +			Map.Entry<Long, TaskStateSnapshot> snapshotEntry = entryIterator.next();
    +			long entryCheckpointId = snapshotEntry.getKey();
    +
    +			if (entryCheckpointId != checkpointID) {
    --- End diff --
    
    I agree with you that the breaking case looks a bit dangerous ... I think maybe we could pass a `Predicate` for the `if` and let the caller side pass the `Predicate` into this function. This could make it cleaner from the caller side and don't need to mass the logic into the `if` to make it complex.


---

[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5578#discussion_r170965682
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java ---
    @@ -126,30 +133,54 @@ public void storeLocalState(
     		LOG.info("Storing local state for checkpoint {}.", checkpointId);
     		LOG.debug("Local state for checkpoint {} is {}.", checkpointId, localState);
     
    -		Map<Long, TaskStateSnapshot> toDiscard = new HashMap<>(16);
    +		Map.Entry<Long, TaskStateSnapshot> toDiscard = null;
     
     		synchronized (lock) {
     			if (disposed) {
     				// we ignore late stores and simply discard the state.
    -				toDiscard.put(checkpointId, localState);
    +				toDiscard = new AbstractMap.SimpleEntry<Long, TaskStateSnapshot>(checkpointId, localState);
     			} else {
     				TaskStateSnapshot previous =
     					storedTaskStateByCheckpointID.put(checkpointId, localState);
     
     				if (previous != null) {
    -					toDiscard.put(checkpointId, previous);
    +					toDiscard = new AbstractMap.SimpleEntry<Long, TaskStateSnapshot>(checkpointId, previous);
     				}
     			}
     		}
     
    -		asyncDiscardLocalStateForCollection(toDiscard.entrySet());
    +		if (toDiscard != null) {
    +			asyncDiscardLocalStateForCollection(Collections.singletonList(toDiscard));
    +		}
     	}
     
     	@Override
     	@Nullable
     	public TaskStateSnapshot retrieveLocalState(long checkpointID) {
     		synchronized (lock) {
     			TaskStateSnapshot snapshot = storedTaskStateByCheckpointID.get(checkpointID);
    +
    +			Iterator<Map.Entry<Long, TaskStateSnapshot>> entryIterator =
    --- End diff --
    
    addressed.


---

[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5578#discussion_r170943110
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java ---
    @@ -90,6 +92,10 @@
     	@GuardedBy("lock")
     	private boolean disposed;
     
    +	/** Whether to discard the useless state when retrieve local checkpoint state. */
    +	@Nonnull
    --- End diff --
    
    This annotation does not fit here.


---

[GitHub] flink pull request #5578: [FLINK-8777][state]Improve resource release for lo...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5578#discussion_r170946385
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java ---
    @@ -90,6 +92,10 @@
     	@GuardedBy("lock")
     	private boolean disposed;
     
    +	/** Whether to discard the useless state when retrieve local checkpoint state. */
    +	@Nonnull
    +	private boolean retrieveWithDiscard;
    --- End diff --
    
    Why not make this a general default?


---