You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/01/27 15:48:55 UTC

flink git commit: [FLINK-5660] [state] Fix state cleanup of PendingCheckpoint

Repository: flink
Updated Branches:
  refs/heads/master ef96054d1 -> 009da6f6e


[FLINK-5660] [state] Fix state cleanup of PendingCheckpoint

When calling PendingCheckpoint.dispose, the state contained of a pending checkpoint
is discarded by an asynchronous task. Since this task accesses the taskStates field
we must not clear it in PendingCheckpoint.dispose. Instead we will clear it once
all state objects have been discarded from within the asynchronous task.

This closes #3220.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/009da6f6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/009da6f6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/009da6f6

Branch: refs/heads/master
Commit: 009da6f6e1c532a12299ee5590bb46fcecb47c32
Parents: ef96054
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jan 26 16:52:33 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jan 27 16:45:20 2017 +0100

----------------------------------------------------------------------
 .../runtime/checkpoint/PendingCheckpoint.java   |  3 +-
 .../checkpoint/PendingCheckpointTest.java       | 42 +++++++++++++++++---
 2 files changed, 39 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/009da6f6/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 1d97e12..1531f0f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -416,6 +416,8 @@ public class PendingCheckpoint {
 							} catch (Exception e) {
 								LOG.warn("Could not properly dispose the pending checkpoint " +
 									"{} of job {}.", checkpointId, jobId, e);
+							} finally {
+								taskStates.clear();
 							}
 						}
 					});
@@ -423,7 +425,6 @@ public class PendingCheckpoint {
 				}
 			} finally {
 				discarded = true;
-				taskStates.clear();
 				notYetAcknowledgedTasks.clear();
 				acknowledgedTasks.clear();
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/009da6f6/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 71013bd..4358526 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -31,8 +31,11 @@ import org.mockito.Mockito;
 
 import java.io.File;
 import java.lang.reflect.Field;
+import java.util.ArrayDeque;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.Executor;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -170,41 +173,50 @@ public class PendingCheckpointTest {
 	public void testAbortDiscardsState() throws Exception {
 		CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false);
 		TaskState state = mock(TaskState.class);
+		QueueExecutor executor = new QueueExecutor();
 
 		String targetDir = tmpFolder.newFolder().getAbsolutePath();
 
 		// Abort declined
-		PendingCheckpoint pending = createPendingCheckpoint(props, targetDir);
+		PendingCheckpoint pending = createPendingCheckpoint(props, targetDir, executor);
 		setTaskState(pending, state);
 
 		pending.abortDeclined();
+		// execute asynchronous discard operation
+		executor.runQueuedCommands();
 		verify(state, times(1)).discardState();
 
 		// Abort error
 		Mockito.reset(state);
 
-		pending = createPendingCheckpoint(props, targetDir);
+		pending = createPendingCheckpoint(props, targetDir, executor);
 		setTaskState(pending, state);
 
 		pending.abortError(new Exception("Expected Test Exception"));
+		// execute asynchronous discard operation
+		executor.runQueuedCommands();
 		verify(state, times(1)).discardState();
 
 		// Abort expired
 		Mockito.reset(state);
 
-		pending = createPendingCheckpoint(props, targetDir);
+		pending = createPendingCheckpoint(props, targetDir, executor);
 		setTaskState(pending, state);
 
 		pending.abortExpired();
+		// execute asynchronous discard operation
+		executor.runQueuedCommands();
 		verify(state, times(1)).discardState();
 
 		// Abort subsumed
 		Mockito.reset(state);
 
-		pending = createPendingCheckpoint(props, targetDir);
+		pending = createPendingCheckpoint(props, targetDir, executor);
 		setTaskState(pending, state);
 
 		pending.abortSubsumed();
+		// execute asynchronous discard operation
+		executor.runQueuedCommands();
 		verify(state, times(1)).discardState();
 	}
 
@@ -270,6 +282,10 @@ public class PendingCheckpointTest {
 	// ------------------------------------------------------------------------
 
 	private static PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, String targetDirectory) {
+		return createPendingCheckpoint(props, targetDirectory, Executors.directExecutor());
+	}
+
+	private static PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, String targetDirectory, Executor executor) {
 		Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(ACK_TASKS);
 		return new PendingCheckpoint(
 			new JobID(),
@@ -278,7 +294,7 @@ public class PendingCheckpointTest {
 			ackTasks,
 			props,
 			targetDirectory,
-			Executors.directExecutor());
+			executor);
 	}
 
 	@SuppressWarnings("unchecked")
@@ -289,4 +305,20 @@ public class PendingCheckpointTest {
 
 		taskStates.put(new JobVertexID(), state);
 	}
+
+	private static final class QueueExecutor implements Executor {
+
+		private final Queue<Runnable> queue = new ArrayDeque<>(4);
+
+		@Override
+		public void execute(Runnable command) {
+			queue.add(command);
+		}
+
+		public void runQueuedCommands() {
+			for (Runnable runnable : queue) {
+				runnable.run();
+			}
+		}
+	}
 }