You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/16 23:35:32 UTC

[1/3] flink git commit: [FLINK-5006] [streaming] Remove assumption of order in SystemProcessingTimeServiceTest

Repository: flink
Updated Branches:
  refs/heads/master a5ec8494c -> a1362c3af


[FLINK-5006] [streaming] Remove assumption of order in SystemProcessingTimeServiceTest

This closes #2785


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1362c3a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1362c3a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1362c3a

Branch: refs/heads/master
Commit: a1362c3af25304b4120232527a2e6008df315de7
Parents: 72b295b
Author: Boris Osipov <bo...@epam.com>
Authored: Thu Nov 10 18:52:56 2016 +0300
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 17 00:34:55 2016 +0100

----------------------------------------------------------------------
 .../tasks/SystemProcessingTimeService.java      |  8 +++
 .../tasks/SystemProcessingTimeServiceTest.java  | 68 --------------------
 2 files changed, 8 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a1362c3a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
index 153aedf..071dbce 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@@ -90,6 +90,14 @@ public class SystemProcessingTimeService extends ProcessingTimeService {
 		return System.currentTimeMillis();
 	}
 
+	/**
+	 * Registers a task to be executed no sooner than time {@code timestamp}, but without strong guarantees of order
+	 *
+	 * @param timestamp Time when the task is to be enabled (in processing time)
+	 * @param target    The task to be executed
+	 * @return The future that represents the scheduled task. This always returns some future,
+	 *         even if the timer was shut down
+	 */
 	@Override
 	public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) {
 		long delay = Math.max(timestamp - getCurrentProcessingTime(), 0);

http://git-wip-us.apache.org/repos/asf/flink/blob/a1362c3a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
index 797e18a..766b313 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTes
 
 import org.junit.Test;
 
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -242,71 +241,4 @@ public class SystemProcessingTimeServiceTest {
 		latch.await();
 		assertTrue(exceptionWasThrown.get());
 	}
-
-	@Test
-	public void testTimerSorting() throws Exception {
-		final Object lock = new Object();
-		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-
-		final SystemProcessingTimeService timer = new SystemProcessingTimeService(
-				new ReferenceSettingExceptionHandler(errorRef), lock);
-
-		try {
-			final OneShotLatch sync = new OneShotLatch();
-
-			// we block the timer execution to make sure we have all the time
-			// to register some additional timers out of order
-			timer.registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() {
-				@Override
-				public void onProcessingTime(long timestamp) throws Exception {
-					sync.await();
-				}
-			});
-			
-			// schedule two timers out of order something
-			final long now = System.currentTimeMillis();
-			final long time1 = now + 6;
-			final long time2 = now + 5;
-			final long time3 = now + 8;
-			final long time4 = now - 2;
-
-			final ArrayBlockingQueue<Long> timestamps = new ArrayBlockingQueue<>(4);
-			ProcessingTimeCallback trigger = new ProcessingTimeCallback() {
-				@Override
-				public void onProcessingTime(long timestamp) {
-					timestamps.add(timestamp);
-				}
-			};
-
-			// schedule
-			ScheduledFuture<?> future1 = timer.registerTimer(time1, trigger);
-			ScheduledFuture<?> future2 = timer.registerTimer(time2, trigger);
-			ScheduledFuture<?> future3 = timer.registerTimer(time3, trigger);
-			ScheduledFuture<?> future4 = timer.registerTimer(time4, trigger);
-
-			// now that everything is scheduled, unblock the timer service
-			sync.trigger();
-
-			// wait until both are complete
-			future1.get();
-			future2.get();
-			future3.get();
-			future4.get();
-
-			// verify that the order is 4 - 2 - 1 - 3
-			assertEquals(4, timestamps.size());
-			assertEquals(time4, timestamps.take().longValue());
-			assertEquals(time2, timestamps.take().longValue());
-			assertEquals(time1, timestamps.take().longValue());
-			assertEquals(time3, timestamps.take().longValue());
-
-			// check that no asynchronous error was reported
-			if (errorRef.get() != null) {
-				throw new Exception(errorRef.get());
-			}
-		}
-		finally {
-			timer.shutdownService();
-		}
-	}
 }


[2/3] flink git commit: [FLINK-5063] [checkpointing] Discard state handles of declined or expired state handles

Posted by se...@apache.org.
[FLINK-5063] [checkpointing] Discard state handles of declined or expired state handles

Whenever the checkpoint coordinator receives an acknowledge checkpoint message which belongs
to the job maintained by the checkpoint coordinator, it should either record the state handles
for later processing or discard to free the resources. The latter case can happen if a
checkpoint has been expired and late acknowledge checkpoint messages arrive. Furthremore, it
can happen if a Task sent a decline checkpoint message while other Tasks where still drawing
a checkpoint. This PR changes the behaviour such that state handles belonging to the job of
the checkpoint coordinator are discarded if they could not be added to the PendingCheckpoint.

This closes #2812


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/72b295b3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/72b295b3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/72b295b3

Branch: refs/heads/master
Commit: 72b295b3b52dff2d0bc5b78881826e8936c370ff
Parents: bf06a1c
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Nov 14 18:33:55 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 17 00:34:55 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  80 ++++++++++----
 .../runtime/checkpoint/PendingCheckpoint.java   |  57 +++++++---
 .../checkpoint/CheckpointCoordinatorTest.java   | 107 +++++++++++++++++++
 3 files changed, 209 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/72b295b3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 8088c3d..886409d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -638,35 +638,62 @@ public class CheckpointCoordinator {
 			if (checkpoint != null && !checkpoint.isDiscarded()) {
 				isPendingCheckpoint = true;
 
-				if (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState())) {
-					if (checkpoint.isFullyAcknowledged()) {
-						completed = checkpoint.finalizeCheckpoint();
+				switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState())) {
+					case SUCCESS:
+						if (checkpoint.isFullyAcknowledged()) {
+							completed = checkpoint.finalizeCheckpoint();
 
-						completedCheckpointStore.addCheckpoint(completed);
+							completedCheckpointStore.addCheckpoint(completed);
 
-						LOG.info("Completed checkpoint " + checkpointId + " (in " +
+							LOG.info("Completed checkpoint " + checkpointId + " (in " +
 								completed.getDuration() + " ms)");
 
-						if (LOG.isDebugEnabled()) {
-							StringBuilder builder = new StringBuilder();
-							for (Map.Entry<JobVertexID, TaskState> entry : completed.getTaskStates().entrySet()) {
-								builder.append("JobVertexID: ").append(entry.getKey()).append(" {").append(entry.getValue()).append("}");
-							}
+							if (LOG.isDebugEnabled()) {
+								StringBuilder builder = new StringBuilder();
+								for (Map.Entry<JobVertexID, TaskState> entry : completed.getTaskStates().entrySet()) {
+									builder.append("JobVertexID: ").append(entry.getKey()).append(" {").append(entry.getValue()).append("}");
+								}
 
-							LOG.debug(builder.toString());
-						}
+								LOG.debug(builder.toString());
+							}
 
-						pendingCheckpoints.remove(checkpointId);
-						rememberRecentCheckpointId(checkpointId);
+							pendingCheckpoints.remove(checkpointId);
+							rememberRecentCheckpointId(checkpointId);
 
-						dropSubsumedCheckpoints(completed.getCheckpointID());
+							dropSubsumedCheckpoints(completed.getCheckpointID());
 
-						triggerQueuedRequests();
-					}
-				} else {
-					// checkpoint did not accept message
-					LOG.error("Received duplicate or invalid acknowledge message for checkpoint {} , task {}",
-							checkpointId, message.getTaskExecutionId());
+							triggerQueuedRequests();
+						}
+						break;
+					case DUPLICATE:
+						LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}.",
+							message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
+						break;
+					case UNKNOWN:
+						LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
+								"because the task's execution attempt id was unknown. Discarding " +
+								"the state handle to avoid lingering state.", message.getCheckpointId(),
+							message.getTaskExecutionId(), message.getJob());
+
+						try {
+							message.getSubtaskState().discardState();
+						} catch (Exception e) {
+							LOG.warn("Could not properly discard state for checkpoint {} of task {} of job {}.",
+								message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), e);
+						}
+						break;
+					case DISCARDED:
+						LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
+							"because the pending checkpoint had been discarded. Discarding the " +
+								"state handle tp avoid lingering state.",
+							message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
+
+						try {
+							message.getSubtaskState().discardState();
+						} catch (Exception e) {
+							LOG.warn("Could not properly discard state for checkpoint {} of task {} of job {}.",
+								message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), e);
+						}
 				}
 			}
 			else if (checkpoint != null) {
@@ -678,11 +705,20 @@ public class CheckpointCoordinator {
 				// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
 				if (recentPendingCheckpoints.contains(checkpointId)) {
 					isPendingCheckpoint = true;
-					LOG.warn("Received late message for now expired checkpoint attempt " + checkpointId);
+					LOG.warn("Received late message for now expired checkpoint attempt {}.", checkpointId);
 				}
 				else {
+					LOG.debug("Received message for an unknown checkpoint {}.", checkpointId);
 					isPendingCheckpoint = false;
 				}
+
+				try {
+					// try to discard the state so that we don't have lingering state lying around
+					message.getSubtaskState().discardState();
+				} catch (Exception e) {
+					LOG.warn("Could not properly discard state for checkpoint {} of task {} of job {}.",
+						message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), e);
+				}
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/72b295b3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 43a2557..5034502 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -35,7 +35,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -64,6 +66,9 @@ public class PendingCheckpoint {
 
 	private final Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks;
 
+	/** Set of acknowledged tasks */
+	private final Set<ExecutionAttemptID> acknowledgedTasks;
+
 	/** Flag indicating whether the checkpoint is triggered as part of periodic scheduling. */
 	private final boolean isPeriodic;
 
@@ -109,6 +114,8 @@ public class PendingCheckpoint {
 
 		checkArgument(verticesToConfirm.size() > 0,
 				"Checkpoint needs at least one vertex that commits the checkpoint");
+
+		acknowledgedTasks = new HashSet<>(verticesToConfirm.size());
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -228,24 +235,37 @@ public class PendingCheckpoint {
 			}
 		}
 	}
-	
-	public boolean acknowledgeTask(
-			ExecutionAttemptID attemptID,
-			SubtaskState checkpointedSubtaskState) {
+
+	/**
+	 * Acknowledges the task with the given execution attempt id and the given subtask state.
+	 *
+	 * @param executionAttemptId of the acknowledged task
+	 * @param subtaskState of the acknowledged task
+	 * @return TaskAcknowledgeResult of the operation
+	 */
+	public TaskAcknowledgeResult acknowledgeTask(
+			ExecutionAttemptID executionAttemptId,
+			SubtaskState subtaskState) {
 
 		synchronized (lock) {
 
 			if (discarded) {
-				return false;
+				return TaskAcknowledgeResult.DISCARDED;
 			}
 
-			final ExecutionVertex vertex = notYetAcknowledgedTasks.remove(attemptID);
+			final ExecutionVertex vertex = notYetAcknowledgedTasks.remove(executionAttemptId);
 
 			if (vertex == null) {
-				return false;
+				if (acknowledgedTasks.contains(executionAttemptId)) {
+					return TaskAcknowledgeResult.DUPLICATE;
+				} else {
+					return TaskAcknowledgeResult.UNKNOWN;
+				}
+			} else {
+				acknowledgedTasks.add(executionAttemptId);
 			}
 
-			if (null != checkpointedSubtaskState) {
+			if (null != subtaskState) {
 
 				JobVertexID jobVertexID = vertex.getJobvertexId();
 				int subtaskIndex = vertex.getParallelSubtaskIndex();
@@ -253,9 +273,9 @@ public class PendingCheckpoint {
 
 				if (null == taskState) {
 					ChainedStateHandle<StreamStateHandle> nonPartitionedState =
-							checkpointedSubtaskState.getLegacyOperatorState();
+							subtaskState.getLegacyOperatorState();
 					ChainedStateHandle<OperatorStateHandle> partitioneableState =
-							checkpointedSubtaskState.getManagedOperatorState();
+							subtaskState.getManagedOperatorState();
 					//TODO this should go away when we remove chained state, assigning state to operators directly instead
 					int chainLength;
 					if (nonPartitionedState != null) {
@@ -276,17 +296,27 @@ public class PendingCheckpoint {
 				}
 
 				long duration = System.currentTimeMillis() - checkpointTimestamp;
-				checkpointedSubtaskState.setDuration(duration);
+				subtaskState.setDuration(duration);
 
-				taskState.putState(subtaskIndex, checkpointedSubtaskState);
+				taskState.putState(subtaskIndex, subtaskState);
 			}
 
 			++numAcknowledgedTasks;
 
-			return true;
+			return TaskAcknowledgeResult.SUCCESS;
 		}
 	}
 
+	/**
+	 * Result of the {@link PendingCheckpoint#acknowledgedTasks} method.
+	 */
+	public enum TaskAcknowledgeResult {
+		SUCCESS, // successful acknowledge of the task
+		DUPLICATE, // acknowledge message is a duplicate
+		UNKNOWN, // unknown task acknowledged
+		DISCARDED // pending checkpoint has been discarded
+	}
+
 	// ------------------------------------------------------------------------
 	//  Cancellation
 	// ------------------------------------------------------------------------
@@ -350,6 +380,7 @@ public class PendingCheckpoint {
 			} finally {
 				taskStates.clear();
 				notYetAcknowledgedTasks.clear();
+				acknowledgedTasks.clear();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/72b295b3/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 5c50c02..a59ffa2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -84,6 +84,7 @@ import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -996,6 +997,112 @@ public class CheckpointCoordinatorTest {
 		}
 	}
 
+	/**
+	 * Tests that late acknowledge checkpoint messages are properly cleaned up. Furthermore it tests
+	 * that unknown checkpoint messages for the same job a are cleaned up as well. In contrast
+	 * checkpointing messages from other jobs should not be touched. A late acknowledge
+	 * message is an acknowledge message which arrives after the checkpoint has been declined.
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void testStateCleanupForLateOrUnknownMessages() throws Exception {
+		final JobID jobId = new JobID();
+
+		final ExecutionAttemptID triggerAttemptId = new ExecutionAttemptID();
+		final ExecutionVertex triggerVertex = mockExecutionVertex(triggerAttemptId);
+
+		final ExecutionAttemptID ackAttemptId1 = new ExecutionAttemptID();
+		final ExecutionVertex ackVertex1 = mockExecutionVertex(ackAttemptId1);
+
+		final ExecutionAttemptID ackAttemptId2 = new ExecutionAttemptID();
+		final ExecutionVertex ackVertex2 = mockExecutionVertex(ackAttemptId2);
+
+		final long timestamp = 1L;
+
+		CheckpointCoordinator coord = new CheckpointCoordinator(
+			jobId,
+			20000L,
+			20000L,
+			0L,
+			1,
+			ExternalizedCheckpointSettings.none(),
+			new ExecutionVertex[] { triggerVertex },
+			new ExecutionVertex[] {triggerVertex, ackVertex1, ackVertex2},
+			new ExecutionVertex[0],
+			new StandaloneCheckpointIDCounter(),
+			new StandaloneCompletedCheckpointStore(1),
+			null,
+			new DisabledCheckpointStatsTracker());
+
+		assertTrue(coord.triggerCheckpoint(timestamp, false));
+
+		assertEquals(1, coord.getNumberOfPendingCheckpoints());
+
+		PendingCheckpoint pendingCheckpoint = coord.getPendingCheckpoints().values().iterator().next();
+
+		long checkpointId = pendingCheckpoint.getCheckpointId();
+
+		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
+
+		SubtaskState triggerSubtaskState = mock(SubtaskState.class);
+
+		// acknowledge the first trigger vertex
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointMetaData, triggerSubtaskState));
+
+		SubtaskState unknownSubtaskState = mock(SubtaskState.class);
+
+		// receive an acknowledge message for an unknown vertex
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointMetaData, unknownSubtaskState));
+
+		// we should discard acknowledge messages from an unknown vertex belonging to our job
+		verify(unknownSubtaskState, times(1)).discardState();
+
+		SubtaskState differentJobSubtaskState = mock(SubtaskState.class);
+
+		// receive an acknowledge message from an unknown job
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointMetaData, differentJobSubtaskState));
+
+		// we should not interfere with different jobs
+		verify(differentJobSubtaskState, never()).discardState();
+
+		// duplicate acknowledge message for the trigger vertex
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointMetaData, triggerSubtaskState));
+
+		// duplicate acknowledge messages for a known vertex should not trigger discarding the state
+		verify(triggerSubtaskState, never()).discardState();
+
+		// let the checkpoint fail at the first ack vertex
+		coord.receiveDeclineMessage(new DeclineCheckpoint(jobId, ackAttemptId1, checkpointId));
+
+		assertTrue(pendingCheckpoint.isDiscarded());
+
+		// check that we've cleaned up the already acknowledged state
+		verify(triggerSubtaskState, times(1)).discardState();
+
+		SubtaskState ackSubtaskState = mock(SubtaskState.class);
+
+		// late acknowledge message from the second ack vertex
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, ackAttemptId2, checkpointMetaData, ackSubtaskState));
+
+		// check that we also cleaned up this state
+		verify(ackSubtaskState, times(1)).discardState();
+
+		// receive an acknowledge message from an unknown job
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), new ExecutionAttemptID(), checkpointMetaData, differentJobSubtaskState));
+
+		// we should not interfere with different jobs
+		verify(differentJobSubtaskState, never()).discardState();
+
+		SubtaskState unknownSubtaskState2 = mock(SubtaskState.class);
+
+		// receive an acknowledge message for an unknown vertex
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, new ExecutionAttemptID(), checkpointMetaData, unknownSubtaskState2));
+
+		// we should discard acknowledge messages from an unknown vertex belonging to our job
+		verify(unknownSubtaskState2, times(1)).discardState();
+	}
+
 	@Test
 	public void testPeriodicTriggering() {
 		try {


[3/3] flink git commit: [FLINK-5057] [taskmanager] Read cancellation timeout from task manager config

Posted by se...@apache.org.
[FLINK-5057] [taskmanager] Read cancellation timeout from task manager config

This closes #2793


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bf06a1cc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bf06a1cc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bf06a1cc

Branch: refs/heads/master
Commit: bf06a1cc786c0a7b8c8d446be01a63edf2cc0897
Parents: a5ec849
Author: Ufuk Celebi <uc...@apache.org>
Authored: Sat Nov 12 20:19:15 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 17 00:34:55 2016 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/runtime/taskmanager/Task.java   | 5 +++--
 .../org/apache/flink/runtime/taskmanager/TaskStopTest.java     | 5 ++++-
 .../java/org/apache/flink/runtime/taskmanager/TaskTest.java    | 6 +++---
 3 files changed, 10 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bf06a1cc/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index b960e68..3254fc1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -299,8 +299,9 @@ public class Task implements Runnable, TaskActions {
 		this.serializedExecutionConfig = jobInformation.getSerializedExecutionConfig();
 		this.taskStateHandles = taskStateHandles;
 
-		this.taskCancellationInterval = taskConfiguration.getLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL);
-		this.taskCancellationTimeout = taskConfiguration.getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT);
+		Configuration tmConfig = taskManagerConfig.getConfiguration();
+		this.taskCancellationInterval = tmConfig.getLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL);
+		this.taskCancellationTimeout = tmConfig.getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT);
 
 		this.memoryManager = Preconditions.checkNotNull(memManager);
 		this.ioManager = Preconditions.checkNotNull(ioManager);

http://git-wip-us.apache.org/repos/asf/flink/blob/bf06a1cc/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
index 276e090..d80dab3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -64,6 +64,9 @@ public class TaskStopTest {
 		TaskInfo taskInfoMock = mock(TaskInfo.class);
 		when(taskInfoMock.getTaskNameWithSubtasks()).thenReturn("dummyName");
 
+		TaskManagerRuntimeInfo tmRuntimeInfo = mock(TaskManagerRuntimeInfo.class);
+		when(tmRuntimeInfo.getConfiguration()).thenReturn(new Configuration());
+
 		task = new Task(
 			mock(JobInformation.class),
 			new TaskInformation(
@@ -89,7 +92,7 @@ public class TaskStopTest {
 			mock(CheckpointResponder.class),
 			mock(LibraryCacheManager.class),
 			mock(FileCache.class),
-			mock(TaskManagerRuntimeInfo.class),
+			tmRuntimeInfo,
 			mock(TaskMetricGroup.class),
 			mock(ResultPartitionConsumableNotifier.class),
 			mock(PartitionStateChecker.class),

http://git-wip-us.apache.org/repos/asf/flink/blob/bf06a1cc/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 5d26050..8177bf7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -781,7 +781,7 @@ public class TaskTest extends TestLogger {
 		ResultPartitionConsumableNotifier consumableNotifier,
 		PartitionStateChecker partitionStateChecker,
 		Executor executor,
-		Configuration taskConfig,
+		Configuration taskManagerConfig,
 		ExecutionConfig execConfig) throws IOException {
 		
 		JobID jobId = new JobID();
@@ -813,7 +813,7 @@ public class TaskTest extends TestLogger {
 			1,
 			1,
 			invokable.getName(),
-			taskConfig);
+			new Configuration());
 		
 		return new Task(
 			jobInformation,
@@ -834,7 +834,7 @@ public class TaskTest extends TestLogger {
 			checkpointResponder,
 			libCache,
 			mock(FileCache.class),
-			new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")),
+			new TaskManagerRuntimeInfo("localhost", taskManagerConfig, System.getProperty("java.io.tmpdir")),
 			mock(TaskMetricGroup.class),
 			consumableNotifier,
 			partitionStateChecker,