You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/09 20:27:21 UTC

[3/4] flink git commit: [tests] Add comments and to recovery tests

[tests] Add comments and to recovery tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9edc804e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9edc804e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9edc804e

Branch: refs/heads/master
Commit: 9edc804e15d0155450ef2b7f710a125545f94062
Parents: 0b15bc3
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Mar 9 14:15:25 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Mar 9 19:31:05 2015 +0100

----------------------------------------------------------------------
 .../flink/test/recovery/SimpleRecoveryITCase.java     |  4 ++++
 .../recovery/TaskManagerFailureRecoveryITCase.java    | 14 ++++++++++++++
 2 files changed, 18 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9edc804e/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
index 8330109..df6fbba 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
@@ -37,6 +37,10 @@ import java.util.List;
 
 import static org.junit.Assert.*;
 
+/**
+ * A series of tests (reusing one FlinkMiniCluster) where tasks fail (one or more time)
+ * and the recovery should restart them to verify job completion.
+ */
 @SuppressWarnings("serial")
 public class SimpleRecoveryITCase {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9edc804e/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
index 85856ba..eb04234 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
@@ -45,6 +45,18 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.*;
 
+/**
+ * This test verifies the behavior of the recovery in the case when a TaskManager
+ * fails (shut down) in the middle of a job execution.
+ *
+ * The test works with multiple in-process task managers. Initially, it starts a JobManager
+ * and two TaskManagers with 2 slots each. It submits a program with parallelism 4
+ * and waits until all tasks are brought up (coordination between the test and the tasks
+ * happens via shared blocking queues). It then starts another TaskManager, which is
+ * guaranteed to remain empty (all tasks are already deployed) and kills one of
+ * the original task managers. The recovery should restart the tasks on the new TaskManager.
+ */
+@SuppressWarnings("serial")
 public class TaskManagerFailureRecoveryITCase {
 
 	@Test
@@ -165,11 +177,13 @@ public class TaskManagerFailureRecoveryITCase {
 	}
 
 	private static class FailingMapper<T> extends RichMapFunction<T, T> {
+		private static final long serialVersionUID = 4435412404173331157L;
 
 		private static final BlockingQueue<Object> TASK_TO_COORD_QUEUE = new LinkedBlockingQueue<Object>();
 
 		private static final BlockingQueue<Object> COORD_TO_TASK_QUEUE = new LinkedBlockingQueue<Object>();
 
+
 		@Override
 		public void open(Configuration parameters) throws Exception {
 			TASK_TO_COORD_QUEUE.add(new Object());