You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/09 20:27:21 UTC
[3/4] flink git commit: [tests] Add comments and to recovery tests
[tests] Add comments and to recovery tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9edc804e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9edc804e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9edc804e
Branch: refs/heads/master
Commit: 9edc804e15d0155450ef2b7f710a125545f94062
Parents: 0b15bc3
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Mar 9 14:15:25 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Mar 9 19:31:05 2015 +0100
----------------------------------------------------------------------
.../flink/test/recovery/SimpleRecoveryITCase.java | 4 ++++
.../recovery/TaskManagerFailureRecoveryITCase.java | 14 ++++++++++++++
2 files changed, 18 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9edc804e/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
index 8330109..df6fbba 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
@@ -37,6 +37,10 @@ import java.util.List;
import static org.junit.Assert.*;
+/**
+ * A series of tests (reusing one FlinkMiniCluster) where tasks fail (one or more time)
+ * and the recovery should restart them to verify job completion.
+ */
@SuppressWarnings("serial")
public class SimpleRecoveryITCase {
http://git-wip-us.apache.org/repos/asf/flink/blob/9edc804e/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
index 85856ba..eb04234 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
@@ -45,6 +45,18 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.*;
+/**
+ * This test verifies the behavior of the recovery in the case when a TaskManager
+ * fails (shut down) in the middle of a job execution.
+ *
+ * The test works with multiple in-process task managers. Initially, it starts a JobManager
+ * and two TaskManagers with 2 slots each. It submits a program with parallelism 4
+ * and waits until all tasks are brought up (coordination between the test and the tasks
+ * happens via shared blocking queues). It then starts another TaskManager, which is
+ * guaranteed to remain empty (all tasks are already deployed) and kills one of
+ * the original task managers. The recovery should restart the tasks on the new TaskManager.
+ */
+@SuppressWarnings("serial")
public class TaskManagerFailureRecoveryITCase {
@Test
@@ -165,11 +177,13 @@ public class TaskManagerFailureRecoveryITCase {
}
private static class FailingMapper<T> extends RichMapFunction<T, T> {
+ private static final long serialVersionUID = 4435412404173331157L;
private static final BlockingQueue<Object> TASK_TO_COORD_QUEUE = new LinkedBlockingQueue<Object>();
private static final BlockingQueue<Object> COORD_TO_TASK_QUEUE = new LinkedBlockingQueue<Object>();
+
@Override
public void open(Configuration parameters) throws Exception {
TASK_TO_COORD_QUEUE.add(new Object());