You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/12/04 11:38:52 UTC

flink git commit: [FLINK-5246] Don't discard checkpoint messages if they are unknown

Repository: flink
Updated Branches:
  refs/heads/release-1.1 020da2ce1 -> da09d418c


[FLINK-5246] Don't discard checkpoint messages if they are unknown

This is the case if the savepoint coordinator has triggered a checkpoint. The corresponding
checkpoint messages are not known to the checkpoint coordinator and thus should not be
discarded. Instead, the JobManager will now discard all messages which have not been accepted
by neither the CheckpointCoordinator nor the SavepointCoordinator.

This closes #2930.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/da09d418
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/da09d418
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/da09d418

Branch: refs/heads/release-1.1
Commit: da09d418c1add17169368a38aeb9d793f9a2324c
Parents: 020da2c
Author: Till Rohrmann <tr...@apache.org>
Authored: Sat Dec 3 20:15:35 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Dec 4 12:38:32 2016 +0100

----------------------------------------------------------------------
 .../runtime/checkpoint/CheckpointCoordinator.java |  6 +++---
 .../flink/runtime/jobmanager/JobManager.scala     | 18 ++++++++++++++++++
 2 files changed, 21 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/da09d418/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 592bafc..a3e511f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -735,15 +735,15 @@ public class CheckpointCoordinator {
 				if (recentPendingCheckpoints.contains(checkpointId)) {
 					wasPendingCheckpoint = true;
 					LOG.warn("Received late message for now expired checkpoint attempt {}.", checkpointId);
+
+					// try to discard the state so that we don't have lingering state lying around
+					discardState(message.getState());
 				}
 				else {
 					LOG.debug("Received message for an unknown checkpoint {}.", checkpointId);
 					wasPendingCheckpoint = false;
 				}
 
-				// try to discard the state so that we don't have lingering state lying around
-				discardState(message.getState());
-
 				return wasPendingCheckpoint;
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/da09d418/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 9f6e2db..cbf7b5d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1376,6 +1376,24 @@ class JobManager(
                     // addressed to the periodic checkpoint coordinator.
                     log.info("Received message for non-existing checkpoint " +
                       ackMessage.getCheckpointId)
+
+                    val classLoader = Option(libraryCacheManager.getClassLoader(jid)) match {
+                      case Some(userCodeClassLoader) => userCodeClassLoader
+                      case None => getClass.getClassLoader
+                    }
+
+                    future {
+                      Option(ackMessage.getState()) match {
+                        case Some(state) =>
+                          try {
+                            state.deserializeValue(classLoader).discardState()
+                          } catch {
+                            case e: Exception => log.warn("Could not discard orphaned checkpoint " +
+                                             "state.", e)
+                          }
+                        case None =>
+                      }
+                    }(ExecutionContext.fromExecutor(ioExecutor))
                   }
                 } catch {
                   case t: Throwable =>