You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by aljoscha <gi...@git.apache.org> on 2017/10/22 09:43:59 UTC

[GitHub] flink pull request #4879: [FLINK-7783] Don't always remove checkpoints in Zo...

GitHub user aljoscha opened a pull request:

    https://github.com/apache/flink/pull/4879

    [FLINK-7783] Don't always remove checkpoints in ZooKeeperCompletedCheckpointStore#recover()

    I think this will be the final version for what I started in #4863.
    
    Now, the code will retrieve checkpoints and succeed if either all of them area read or of two successive tries read the same set of checkpoints.
    
    This doesn't duplicate the test anymore but still leaves the questionable (lack of) separation of concerns in the store.
    
    R: @StefanRRichter, @tillrohrmann 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aljoscha/flink jira-7783-zookeeper-state-store-fix-simplified3

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4879.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4879
    
----
commit ca189b4c44810229331332e397523cba5417b4d6
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2017-10-22T09:40:43Z

    [FLINK-7783] Don't always remove checkpoints in ZooKeeperCompletedCheckpointStore#recover()

----


---

[GitHub] flink pull request #4879: [FLINK-7783] Don't always remove checkpoints in Zo...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4879#discussion_r146160129
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java ---
    @@ -163,22 +162,50 @@ public void recover() throws Exception {
     
    --- End diff --
    
    In line 156, I found code that considers concurrent modifications in the `ZookeeperStateHandleStore`. Just for discussion, I wonder what happens with concurrent modifications after we retrieved `initialCheckpoints`. Couldn't this mean that after we became `initialCheckpoints` (maybe of size 1), this becomes modified and the only handle is suddenly invalid and we can no longer recover? Maybe the code was just written in an overprotective way and this is a non-issue?


---

[GitHub] flink pull request #4879: [FLINK-7783] Don't always remove checkpoints in Zo...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4879#discussion_r146159867
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java ---
    @@ -163,22 +162,50 @@ public void recover() throws Exception {
     
     		LOG.info("Found {} checkpoints in ZooKeeper.", numberOfInitialCheckpoints);
     
    -		for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpointStateHandle : initialCheckpoints) {
    +		// Try and read the state handles from storage. We try until we either successfully read
    +		// all of them or when we reach a stable state, i.e. when successfully read the same set
    +		// of checkpoints in two tries.
    --- End diff --
    
    Maybe we could enhance to comment to talk about the reasons why this code is written in a certain way (DFS outage) and maybe also include a word about incremental checkpoints to help future maintainers and/or our own memories. Unfortunately this also signals that this code has to consider lots of implicit side effects :-(


---

[GitHub] flink pull request #4879: [FLINK-7783] Don't always remove checkpoints in Zo...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4879#discussion_r146159739
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java ---
    @@ -302,4 +302,32 @@ public void registerSharedStatesAfterRestored(SharedStateRegistry sharedStateReg
     	public String toString() {
     		return String.format("Checkpoint %d @ %d for %s", checkpointID, timestamp, job);
     	}
    +
    +	@Override
    +	public boolean equals(Object o) {
    +		if (this == o) {
    +			return true;
    +		}
    +		if (o == null || getClass() != o.getClass()) {
    +			return false;
    +		}
    +
    +		CompletedCheckpoint that = (CompletedCheckpoint) o;
    +
    +		if (checkpointID != that.checkpointID) {
    +			return false;
    +		}
    +		if (timestamp != that.timestamp) {
    --- End diff --
    
    If `checkpointID` is truly fulfilling an ID role, it should be the one and only attribute used in comparison and hash code. 


---

[GitHub] flink pull request #4879: [FLINK-7783] Don't always remove checkpoints in Zo...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha closed the pull request at:

    https://github.com/apache/flink/pull/4879


---

[GitHub] flink pull request #4879: [FLINK-7783] Don't always remove checkpoints in Zo...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4879#discussion_r146218580
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java ---
    @@ -302,4 +302,32 @@ public void registerSharedStatesAfterRestored(SharedStateRegistry sharedStateReg
     	public String toString() {
     		return String.format("Checkpoint %d @ %d for %s", checkpointID, timestamp, job);
     	}
    +
    +	@Override
    +	public boolean equals(Object o) {
    +		if (this == o) {
    +			return true;
    +		}
    +		if (o == null || getClass() != o.getClass()) {
    +			return false;
    +		}
    +
    +		CompletedCheckpoint that = (CompletedCheckpoint) o;
    +
    +		if (checkpointID != that.checkpointID) {
    +			return false;
    +		}
    +		if (timestamp != that.timestamp) {
    --- End diff --
    
    And there will only be checkpoints of the same JobId in a checkpoint store, so including the JobId should also not be necessary.


---

[GitHub] flink issue #4879: [FLINK-7783] Don't always remove checkpoints in ZooKeeper...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/4879
  
    I think overall this fix looks very good now. This is implementing a different strategy to deal with problematic checkpoints that can recover from more scenarios than the initial PR which was discussed between @aljoscha and me offline, so I prefer this approach.
    
    I only had minor comments and one talking point about potential concurrent modifications. If this turns out to be an non-issue (as I expect), I would approve this for merging.


---

[GitHub] flink issue #4879: [FLINK-7783] Don't always remove checkpoints in ZooKeeper...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/4879
  
    Merged


---

[GitHub] flink pull request #4879: [FLINK-7783] Don't always remove checkpoints in Zo...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4879#discussion_r146218235
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java ---
    @@ -302,4 +302,32 @@ public void registerSharedStatesAfterRestored(SharedStateRegistry sharedStateReg
     	public String toString() {
     		return String.format("Checkpoint %d @ %d for %s", checkpointID, timestamp, job);
     	}
    +
    +	@Override
    +	public boolean equals(Object o) {
    +		if (this == o) {
    +			return true;
    +		}
    +		if (o == null || getClass() != o.getClass()) {
    +			return false;
    +		}
    +
    +		CompletedCheckpoint that = (CompletedCheckpoint) o;
    +
    +		if (checkpointID != that.checkpointID) {
    +			return false;
    +		}
    +		if (timestamp != that.timestamp) {
    --- End diff --
    
    It should be, so fixing.


---

[GitHub] flink pull request #4879: [FLINK-7783] Don't always remove checkpoints in Zo...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4879#discussion_r146219117
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java ---
    @@ -163,22 +162,50 @@ public void recover() throws Exception {
     
     		LOG.info("Found {} checkpoints in ZooKeeper.", numberOfInitialCheckpoints);
     
    -		for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpointStateHandle : initialCheckpoints) {
    +		// Try and read the state handles from storage. We try until we either successfully read
    +		// all of them or when we reach a stable state, i.e. when successfully read the same set
    +		// of checkpoints in two tries.
    --- End diff --
    
    extending the comment



---