You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/29 06:39:34 UTC

[flink] branch master updated: [FLINK-12413][runtime] Implement ExecutionFailureHandler

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 07e75de  [FLINK-12413][runtime] Implement ExecutionFailureHandler
07e75de is described below

commit 07e75de29969503b7feb639771d86ada6f5fbdf0
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Wed May 29 14:39:21 2019 +0800

    [FLINK-12413][runtime] Implement ExecutionFailureHandler
    
    * Implement ExecutionFailureHandler
    
    * Throws exception when getting tasks or restart delay from the failure handling result when the restarting is suppressed; Renames verticesToBeRestarted to verticesToRestart
    
    * Address the comments
    
    * make verticesToRestart in FailureHandlingResult unmodifiable
    
    * Support checking for nested unrecoverable throwable; address comments
---
 .../failover/flip1/ExecutionFailureHandler.java    |  86 ++++++++
 .../failover/flip1/FailoverStrategy.java           |  18 ++
 .../failover/flip1/FailureHandlingResult.java      | 137 +++++++++++++
 .../failover/flip1/RestartBackoffTimeStrategy.java |  62 ++++++
 .../runtime/throwable/ThrowableClassifier.java     |  27 +++
 .../executiongraph/ThrowableClassifierTest.java    |  33 ++++
 .../flip1/ExecutionFailureHandlerTest.java         | 219 +++++++++++++++++++++
 .../failover/flip1/FailureHandlingResultTest.java  |  85 ++++++++
 8 files changed, 667 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
new file mode 100644
index 0000000..3225992
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.throwable.ThrowableClassifier;
+import org.apache.flink.runtime.throwable.ThrowableType;
+
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This handler deals with task failures to return a {@link FailureHandlingResult} which contains tasks
+ * to restart to recover from failures.
+ */
+public class ExecutionFailureHandler {
+
+	/** Strategy to judge which tasks should be restarted. */
+	private final FailoverStrategy failoverStrategy;
+
+	/** Strategy to judge whether and when a restarting should be done. */
+	private final RestartBackoffTimeStrategy restartBackoffTimeStrategy;
+
+	/**
+	 * Creates the handler to deal with task failures.
+	 *
+	 * @param failoverStrategy helps to decide tasks to restart on task failures
+	 * @param restartBackoffTimeStrategy helps to decide whether to restart failed tasks and the restarting delay
+	 */
+	public ExecutionFailureHandler(
+		FailoverStrategy failoverStrategy,
+		RestartBackoffTimeStrategy restartBackoffTimeStrategy) {
+
+		this.failoverStrategy = checkNotNull(failoverStrategy);
+		this.restartBackoffTimeStrategy = checkNotNull(restartBackoffTimeStrategy);
+	}
+
+	/**
+	 * Return result of failure handling. Can be a set of task vertices to restart
+	 * and a delay of the restarting. Or that the failure is not recoverable and the reason for it.
+	 *
+	 * @param failedTask is the ID of the failed task vertex
+	 * @param cause of the task failure
+	 * @return result of the failure handling
+	 */
+	public FailureHandlingResult getFailureHandlingResult(ExecutionVertexID failedTask, Throwable cause) {
+		if (isUnrecoverableError(cause)) {
+			return FailureHandlingResult.unrecoverable(new JobException("The failure is not recoverable", cause));
+		}
+
+		restartBackoffTimeStrategy.notifyFailure(cause);
+		if (restartBackoffTimeStrategy.canRestart()) {
+			return FailureHandlingResult.restartable(
+				failoverStrategy.getTasksNeedingRestart(failedTask, cause),
+				restartBackoffTimeStrategy.getBackoffTime());
+		} else {
+			return FailureHandlingResult.unrecoverable(
+				new JobException("Failed task restarting is suppressed by " + restartBackoffTimeStrategy, cause));
+		}
+	}
+
+	@VisibleForTesting
+	static boolean isUnrecoverableError(Throwable cause) {
+		Optional<Throwable> unrecoverableError = ThrowableClassifier.findThrowableOfThrowableType(
+			cause, ThrowableType.NonRecoverableError);
+		return unrecoverableError.isPresent();
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverStrategy.java
index 2fd4ce7..0bd0c01 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverStrategy.java
@@ -34,4 +34,22 @@ public interface FailoverStrategy {
 	 * @return set of IDs of vertices to restart
 	 */
 	Set<ExecutionVertexID> getTasksNeedingRestart(ExecutionVertexID executionVertexId, Throwable cause);
+
+	// ------------------------------------------------------------------------
+	//  factory
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The factory to instantiate {@link FailoverStrategy}.
+	 */
+	interface Factory {
+
+		/**
+		 * Instantiates the {@link FailoverStrategy}.
+		 *
+		 * @param topology of the graph to failover
+		 * @return The instantiated failover strategy.
+		 */
+		FailoverStrategy create(FailoverTopology topology);
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
new file mode 100644
index 0000000..51e487d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.Collections;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Result containing the tasks to restart upon a task failure.
+ * Also contains the reason if the failure is not recoverable(non-recoverable
+ * failure type or restarting suppressed by restart strategy).
+ */
+public class FailureHandlingResult {
+
+	/** Task vertices to restart to recover from the failure. */
+	private final Set<ExecutionVertexID> verticesToRestart;
+
+	/** Delay before the restarting can be conducted. */
+	private final long restartDelayMS;
+
+	/** Reason why the failure is not recoverable. */
+	private final Throwable error;
+
+	/**
+	 * Creates a result of a set of tasks to restart to recover from the failure.
+	 *
+	 * @param verticesToRestart containing task vertices to restart to recover from the failure
+	 * @param restartDelayMS indicate a delay before conducting the restart
+	 */
+	private FailureHandlingResult(Set<ExecutionVertexID> verticesToRestart, long restartDelayMS) {
+		checkState(restartDelayMS >= 0);
+
+		this.verticesToRestart = Collections.unmodifiableSet(checkNotNull(verticesToRestart));
+		this.restartDelayMS = restartDelayMS;
+		this.error = null;
+	}
+
+	/**
+	 * Creates a result that the failure is not recoverable and no restarting should be conducted.
+	 *
+	 * @param error reason why the failure is not recoverable
+	 */
+	private FailureHandlingResult(Throwable error) {
+		this.verticesToRestart = null;
+		this.restartDelayMS = -1;
+		this.error = checkNotNull(error);
+	}
+
+	/**
+	 * Returns the tasks to restart.
+	 *
+	 * @return the tasks to restart
+	 */
+	public Set<ExecutionVertexID> getVerticesToRestart() {
+		if (canRestart()) {
+			return verticesToRestart;
+		} else {
+			throw new IllegalStateException("Cannot get vertices to restart when the restarting is suppressed.");
+		}
+	}
+
+	/**
+	 * Returns the delay before the restarting.
+	 *
+	 * @return the delay before the restarting
+	 */
+	public long getRestartDelayMS() {
+		if (canRestart()) {
+			return restartDelayMS;
+		} else {
+			throw new IllegalStateException("Cannot get restart delay when the restarting is suppressed.");
+		}
+	}
+
+	/**
+	 * Returns reason why the restarting cannot be conducted.
+	 *
+	 * @return reason why the restarting cannot be conducted
+	 */
+	public Throwable getError() {
+		if (canRestart()) {
+			throw new IllegalStateException("Cannot get error when the restarting is accepted.");
+		} else {
+			return error;
+		}
+	}
+
+	/**
+	 * Returns whether the restarting can be conducted.
+	 *
+	 * @return whether the restarting can be conducted
+	 */
+	public boolean canRestart() {
+		return error == null;
+	}
+
+	/**
+	 * Creates a result of a set of tasks to restart to recover from the failure.
+	 *
+	 * @param verticesToRestart containing task vertices to restart to recover from the failure
+	 * @param restartDelayMS indicate a delay before conducting the restart
+	 * @return result of a set of tasks to restart to recover from the failure
+	 */
+	public static FailureHandlingResult restartable(Set<ExecutionVertexID> verticesToRestart, long restartDelayMS) {
+		return new FailureHandlingResult(verticesToRestart, restartDelayMS);
+	}
+
+	/**
+	 * Creates a result that the failure is not recoverable and no restarting should be conducted.
+	 *
+	 * @param error reason why the failure is not recoverable
+	 * @return result indicating the failure is not recoverable
+	 */
+	public static FailureHandlingResult unrecoverable(Throwable error) {
+		return new FailureHandlingResult(error);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategy.java
new file mode 100644
index 0000000..cd7a39b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategy.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+/**
+ * Strategy to decide whether to restart failed tasks and the delay to do the restarting.
+ */
+public interface RestartBackoffTimeStrategy {
+
+	/**
+	 * Returns whether a restart should be conducted.
+	 *
+	 * @return whether a restart should be conducted
+	 */
+	boolean canRestart();
+
+	/**
+	 * Returns the delay to do the restarting.
+	 *
+	 * @return the delay to do the restarting
+	 */
+	long getBackoffTime();
+
+	/**
+	 * Notify the strategy about the task failure cause.
+	 *
+	 * @param cause of the task failure
+	 */
+	void notifyFailure(Throwable cause);
+
+	// ------------------------------------------------------------------------
+	//  factory
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The factory to instantiate {@link RestartBackoffTimeStrategy}.
+	 */
+	interface Factory {
+
+		/**
+		 * Instantiates the {@link RestartBackoffTimeStrategy}.
+		 *
+		 * @return The instantiated restart strategy.
+		 */
+		RestartBackoffTimeStrategy create();
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableClassifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableClassifier.java
index 4a17b5f..a08c28f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableClassifier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableClassifier.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.throwable;
 
+import java.util.Optional;
+
 /**
  * Helper class, given a exception do the classification.
  */
@@ -33,4 +35,29 @@ public class ThrowableClassifier {
 		final ThrowableAnnotation annotation = cause.getClass().getAnnotation(ThrowableAnnotation.class);
 		return annotation == null ? ThrowableType.RecoverableError : annotation.value();
 	}
+
+	/**
+	 * Checks whether a throwable chain contains a specific throwable type and returns the corresponding throwable.
+	 *
+	 * @param throwable the throwable chain to check.
+	 * @param throwableType the throwable type to search for in the chain.
+	 * @return Optional throwable of the throwable type if available, otherwise empty
+	 */
+	public static Optional<Throwable> findThrowableOfThrowableType(Throwable throwable, ThrowableType throwableType) {
+		if (throwable == null || throwableType == null) {
+			return Optional.empty();
+		}
+
+		Throwable t = throwable;
+		while (t != null) {
+			final ThrowableAnnotation annotation = t.getClass().getAnnotation(ThrowableAnnotation.class);
+			if (annotation != null && annotation.value() == throwableType) {
+				return Optional.of(t);
+			} else {
+				t = t.getCause();
+			}
+		}
+
+		return Optional.empty();
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ThrowableClassifierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ThrowableClassifierTest.java
index 57b330e..517321a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ThrowableClassifierTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ThrowableClassifierTest.java
@@ -27,6 +27,8 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test throwable classifier
@@ -68,6 +70,34 @@ public class ThrowableClassifierTest extends TestLogger {
 			ThrowableClassifier.getThrowableType(new Sub_ThrowableType_PartitionDataMissingError_Exception()));
 	}
 
+	@Test
+	public void testFindThrowableOfThrowableType() {
+		// no throwable type
+		assertFalse(ThrowableClassifier.findThrowableOfThrowableType(
+			new Exception(),
+			ThrowableType.RecoverableError).isPresent());
+
+		// no recoverable throwable type
+		assertFalse(ThrowableClassifier.findThrowableOfThrowableType(
+			new ThrowableType_PartitionDataMissingError_Exception(),
+			ThrowableType.RecoverableError).isPresent());
+
+		// direct recoverable throwable
+		assertTrue(ThrowableClassifier.findThrowableOfThrowableType(
+			new ThrowableType_RecoverableFailure_Exception(),
+			ThrowableType.RecoverableError).isPresent());
+
+		// nested recoverable throwable
+		assertTrue(ThrowableClassifier.findThrowableOfThrowableType(
+			new Exception(new ThrowableType_RecoverableFailure_Exception()),
+			ThrowableType.RecoverableError).isPresent());
+
+		// inherit recoverable throwable
+		assertTrue(ThrowableClassifier.findThrowableOfThrowableType(
+			new Sub_ThrowableType_RecoverableFailure_Exception(),
+			ThrowableType.RecoverableError).isPresent());
+	}
+
 	@ThrowableAnnotation(ThrowableType.PartitionDataMissingError)
 	private class ThrowableType_PartitionDataMissingError_Exception extends Exception {
 	}
@@ -82,4 +112,7 @@ public class ThrowableClassifierTest extends TestLogger {
 
 	private class Sub_ThrowableType_PartitionDataMissingError_Exception extends ThrowableType_PartitionDataMissingError_Exception {
 	}
+
+	private class Sub_ThrowableType_RecoverableFailure_Exception extends ThrowableType_RecoverableFailure_Exception {
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java
new file mode 100644
index 0000000..ed37496
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link ExecutionFailureHandler}.
+ */
+public class ExecutionFailureHandlerTest extends TestLogger {
+
+	/**
+	 * Tests the case that task restarting is accepted.
+	 */
+	@Test
+	public void testNormalFailureHandling() {
+		// failover strategy which always suggests restarting the given tasks
+		Set<ExecutionVertexID> tasksToRestart = new HashSet<>();
+		tasksToRestart.add(new ExecutionVertexID(new JobVertexID(), 0));
+		FailoverStrategy failoverStrategy = new TestFailoverStrategy(tasksToRestart);
+
+		// restart strategy which accepts restarting
+		boolean canRestart = true;
+		long restartDelayMs = 1234;
+		RestartBackoffTimeStrategy restartStrategy = new TestRestartBackoffTimeStrategy(canRestart, restartDelayMs);
+		ExecutionFailureHandler executionFailureHandler = new ExecutionFailureHandler(failoverStrategy, restartStrategy);
+
+		// trigger a task failure
+		FailureHandlingResult result = executionFailureHandler.getFailureHandlingResult(
+			new ExecutionVertexID(new JobVertexID(), 0),
+			new Exception("test failure"));
+
+		// verify results
+		assertTrue(result.canRestart());
+		assertEquals(restartDelayMs, result.getRestartDelayMS());
+		assertEquals(tasksToRestart, result.getVerticesToRestart());
+		try {
+			result.getError();
+			fail("Cannot get error when the restarting is accepted");
+		} catch (IllegalStateException ex) {
+			// expected
+		}
+	}
+
+	/**
+	 * Tests the case that task restarting is suppressed.
+	 */
+	@Test
+	public void testRestartingSuppressedFailureHandlingResult() {
+		// failover strategy which always suggests restarting the given tasks
+		Set<ExecutionVertexID> tasksToRestart = new HashSet<>();
+		tasksToRestart.add(new ExecutionVertexID(new JobVertexID(), 0));
+		FailoverStrategy failoverStrategy = new TestFailoverStrategy(tasksToRestart);
+
+		// restart strategy which suppresses restarting
+		boolean canRestart = false;
+		long restartDelayMs = 1234;
+		RestartBackoffTimeStrategy restartStrategy = new TestRestartBackoffTimeStrategy(canRestart, restartDelayMs);
+		ExecutionFailureHandler executionFailureHandler = new ExecutionFailureHandler(failoverStrategy, restartStrategy);
+
+		// trigger a task failure
+		FailureHandlingResult result = executionFailureHandler.getFailureHandlingResult(
+			new ExecutionVertexID(new JobVertexID(), 0),
+			new Exception("test failure"));
+
+		// verify results
+		assertFalse(result.canRestart());
+		assertNotNull(result.getError());
+		assertFalse(ExecutionFailureHandler.isUnrecoverableError(result.getError()));
+		try {
+			result.getVerticesToRestart();
+			fail("get tasks to restart is not allowed when restarting is suppressed");
+		} catch (IllegalStateException ex) {
+			// expected
+		}
+		try {
+			result.getRestartDelayMS();
+			fail("get restart delay is not allowed when restarting is suppressed");
+		} catch (IllegalStateException ex) {
+			// expected
+		}
+	}
+
+	/**
+	 * Tests the case that the failure is non-recoverable type.
+	 */
+	@Test
+	public void testNonRecoverableFailureHandlingResult() {
+		// failover strategy which always suggests restarting the given tasks
+		Set<ExecutionVertexID> tasksToRestart = new HashSet<>();
+		tasksToRestart.add(new ExecutionVertexID(new JobVertexID(), 0));
+		FailoverStrategy failoverStrategy = new TestFailoverStrategy(tasksToRestart);
+
+		// restart strategy which accepts restarting
+		boolean canRestart = true;
+		long restartDelayMs = 1234;
+		RestartBackoffTimeStrategy restartStrategy = new TestRestartBackoffTimeStrategy(canRestart, restartDelayMs);
+		ExecutionFailureHandler executionFailureHandler = new ExecutionFailureHandler(failoverStrategy, restartStrategy);
+
+		// trigger an unrecoverable task failure
+		FailureHandlingResult result = executionFailureHandler.getFailureHandlingResult(
+			new ExecutionVertexID(new JobVertexID(), 0),
+			new Exception(new SuppressRestartsException(new Exception("test failure"))));
+
+		// verify results
+		assertFalse(result.canRestart());
+		assertNotNull(result.getError());
+		assertTrue(ExecutionFailureHandler.isUnrecoverableError(result.getError()));
+		try {
+			result.getVerticesToRestart();
+			fail("get tasks to restart is not allowed when restarting is suppressed");
+		} catch (IllegalStateException ex) {
+			// expected
+		}
+		try {
+			result.getRestartDelayMS();
+			fail("get restart delay is not allowed when restarting is suppressed");
+		} catch (IllegalStateException ex) {
+			// expected
+		}
+	}
+
+	/**
+	 * Tests the check for unrecoverable error.
+	 */
+	@Test
+	public void testUnrecoverableErrorCheck() {
+		// normal error
+		assertFalse(ExecutionFailureHandler.isUnrecoverableError(new Exception()));
+
+		// direct unrecoverable error
+		assertTrue(ExecutionFailureHandler.isUnrecoverableError(new SuppressRestartsException(new Exception())));
+
+		// nested unrecoverable error
+		assertTrue(ExecutionFailureHandler.isUnrecoverableError(
+			new Exception(new SuppressRestartsException(new Exception()))));
+	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A FailoverStrategy implementation for tests. It always suggest restarting the given task set on construction.
+	 */
+	private class TestFailoverStrategy implements FailoverStrategy {
+
+		private final Set<ExecutionVertexID> tasksToRestart;
+
+		public TestFailoverStrategy(Set<ExecutionVertexID> tasksToRestart) {
+			this.tasksToRestart = checkNotNull(tasksToRestart);
+		}
+
+		@Override
+		public Set<ExecutionVertexID> getTasksNeedingRestart(ExecutionVertexID executionVertexId, Throwable cause) {
+			return tasksToRestart;
+		}
+	}
+
+	/**
+	 * A RestartBackoffTimeStrategy implementation for tests.
+	 */
+	private class TestRestartBackoffTimeStrategy implements RestartBackoffTimeStrategy {
+
+		private final boolean canRestart;
+
+		private final long backoffTime;
+
+		public TestRestartBackoffTimeStrategy(boolean canRestart, long backoffTime) {
+			this.canRestart = canRestart;
+			this.backoffTime = backoffTime;
+		}
+
+		@Override
+		public boolean canRestart() {
+			return canRestart;
+		}
+
+		@Override
+		public long getBackoffTime() {
+			return backoffTime;
+		}
+
+		@Override
+		public void notifyFailure(Throwable cause) {
+			// ignore
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java
new file mode 100644
index 0000000..8943655
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link FailureHandlingResult}.
+ */
+public class FailureHandlingResultTest extends TestLogger {
+
+	/**
+	 * Tests normal FailureHandlingResult.
+	 */
+	@Test
+	public void testNormalFailureHandlingResult() {
+		// create a normal FailureHandlingResult
+		Set<ExecutionVertexID> tasks = new HashSet<>();
+		tasks.add(new ExecutionVertexID(new JobVertexID(), 0));
+		long delay = 1234;
+		FailureHandlingResult result = FailureHandlingResult.restartable(tasks, delay);
+
+		assertTrue(result.canRestart());
+		assertEquals(delay, result.getRestartDelayMS());
+		assertEquals(tasks, result.getVerticesToRestart());
+		try {
+			result.getError();
+			fail("Cannot get error when the restarting is accepted");
+		} catch (IllegalStateException ex) {
+			// expected
+		}
+	}
+
+	/**
+	 * Tests FailureHandlingResult which suppresses restarts.
+	 */
+	@Test
+	public void testRestartingSuppressedFailureHandlingResult() {
+		// create a FailureHandlingResult with error
+		Throwable error = new Exception("test error");
+		FailureHandlingResult result = FailureHandlingResult.unrecoverable(error);
+
+		assertFalse(result.canRestart());
+		assertEquals(error, result.getError());
+		try {
+			result.getVerticesToRestart();
+			fail("get tasks to restart is not allowed when restarting is suppressed");
+		} catch (IllegalStateException ex) {
+			// expected
+		}
+		try {
+			result.getRestartDelayMS();
+			fail("get restart delay is not allowed when restarting is suppressed");
+		} catch (IllegalStateException ex) {
+			// expected
+		}
+	}
+}