You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/29 06:39:34 UTC
[flink] branch master updated: [FLINK-12413][runtime] Implement
ExecutionFailureHandler
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 07e75de [FLINK-12413][runtime] Implement ExecutionFailureHandler
07e75de is described below
commit 07e75de29969503b7feb639771d86ada6f5fbdf0
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Wed May 29 14:39:21 2019 +0800
[FLINK-12413][runtime] Implement ExecutionFailureHandler
* Implement ExecutionFailureHandler
* Throws exception when getting tasks or restart delay from the failure handling result when the restarting is suppressed; Renames verticesToBeRestarted to verticesToRestart
* Address the comments
* make verticesToRestart in FailureHandlingResult unmodifiable
* Support checking for nested unrecoverable throwable; address comments
---
.../failover/flip1/ExecutionFailureHandler.java | 86 ++++++++
.../failover/flip1/FailoverStrategy.java | 18 ++
.../failover/flip1/FailureHandlingResult.java | 137 +++++++++++++
.../failover/flip1/RestartBackoffTimeStrategy.java | 62 ++++++
.../runtime/throwable/ThrowableClassifier.java | 27 +++
.../executiongraph/ThrowableClassifierTest.java | 33 ++++
.../flip1/ExecutionFailureHandlerTest.java | 219 +++++++++++++++++++++
.../failover/flip1/FailureHandlingResultTest.java | 85 ++++++++
8 files changed, 667 insertions(+)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
new file mode 100644
index 0000000..3225992
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.throwable.ThrowableClassifier;
+import org.apache.flink.runtime.throwable.ThrowableType;
+
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This handler deals with task failures to return a {@link FailureHandlingResult} which contains tasks
+ * to restart to recover from failures.
+ */
+public class ExecutionFailureHandler {
+
+ /** Strategy to judge which tasks should be restarted. */
+ private final FailoverStrategy failoverStrategy;
+
+ /** Strategy to judge whether and when a restarting should be done. */
+ private final RestartBackoffTimeStrategy restartBackoffTimeStrategy;
+
+ /**
+ * Creates the handler to deal with task failures.
+ *
+ * @param failoverStrategy helps to decide tasks to restart on task failures
+ * @param restartBackoffTimeStrategy helps to decide whether to restart failed tasks and the restarting delay
+ */
+ public ExecutionFailureHandler(
+ FailoverStrategy failoverStrategy,
+ RestartBackoffTimeStrategy restartBackoffTimeStrategy) {
+
+ this.failoverStrategy = checkNotNull(failoverStrategy);
+ this.restartBackoffTimeStrategy = checkNotNull(restartBackoffTimeStrategy);
+ }
+
+ /**
+ * Return result of failure handling. Can be a set of task vertices to restart
+ * and a delay of the restarting. Or that the failure is not recoverable and the reason for it.
+ *
+ * @param failedTask is the ID of the failed task vertex
+ * @param cause of the task failure
+ * @return result of the failure handling
+ */
+ public FailureHandlingResult getFailureHandlingResult(ExecutionVertexID failedTask, Throwable cause) {
+ if (isUnrecoverableError(cause)) {
+ return FailureHandlingResult.unrecoverable(new JobException("The failure is not recoverable", cause));
+ }
+
+ restartBackoffTimeStrategy.notifyFailure(cause);
+ if (restartBackoffTimeStrategy.canRestart()) {
+ return FailureHandlingResult.restartable(
+ failoverStrategy.getTasksNeedingRestart(failedTask, cause),
+ restartBackoffTimeStrategy.getBackoffTime());
+ } else {
+ return FailureHandlingResult.unrecoverable(
+ new JobException("Failed task restarting is suppressed by " + restartBackoffTimeStrategy, cause));
+ }
+ }
+
+ @VisibleForTesting
+ static boolean isUnrecoverableError(Throwable cause) {
+ Optional<Throwable> unrecoverableError = ThrowableClassifier.findThrowableOfThrowableType(
+ cause, ThrowableType.NonRecoverableError);
+ return unrecoverableError.isPresent();
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverStrategy.java
index 2fd4ce7..0bd0c01 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailoverStrategy.java
@@ -34,4 +34,22 @@ public interface FailoverStrategy {
* @return set of IDs of vertices to restart
*/
Set<ExecutionVertexID> getTasksNeedingRestart(ExecutionVertexID executionVertexId, Throwable cause);
+
+ // ------------------------------------------------------------------------
+ // factory
+ // ------------------------------------------------------------------------
+
+ /**
+ * The factory to instantiate {@link FailoverStrategy}.
+ */
+ interface Factory {
+
+ /**
+ * Instantiates the {@link FailoverStrategy}.
+ *
+ * @param topology of the graph to failover
+ * @return The instantiated failover strategy.
+ */
+ FailoverStrategy create(FailoverTopology topology);
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
new file mode 100644
index 0000000..51e487d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.Collections;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Result containing the tasks to restart upon a task failure.
+ * Also contains the reason if the failure is not recoverable(non-recoverable
+ * failure type or restarting suppressed by restart strategy).
+ */
+public class FailureHandlingResult {
+
+ /** Task vertices to restart to recover from the failure. */
+ private final Set<ExecutionVertexID> verticesToRestart;
+
+ /** Delay before the restarting can be conducted. */
+ private final long restartDelayMS;
+
+ /** Reason why the failure is not recoverable. */
+ private final Throwable error;
+
+ /**
+ * Creates a result of a set of tasks to restart to recover from the failure.
+ *
+ * @param verticesToRestart containing task vertices to restart to recover from the failure
+ * @param restartDelayMS indicate a delay before conducting the restart
+ */
+ private FailureHandlingResult(Set<ExecutionVertexID> verticesToRestart, long restartDelayMS) {
+ checkState(restartDelayMS >= 0);
+
+ this.verticesToRestart = Collections.unmodifiableSet(checkNotNull(verticesToRestart));
+ this.restartDelayMS = restartDelayMS;
+ this.error = null;
+ }
+
+ /**
+ * Creates a result that the failure is not recoverable and no restarting should be conducted.
+ *
+ * @param error reason why the failure is not recoverable
+ */
+ private FailureHandlingResult(Throwable error) {
+ this.verticesToRestart = null;
+ this.restartDelayMS = -1;
+ this.error = checkNotNull(error);
+ }
+
+ /**
+ * Returns the tasks to restart.
+ *
+ * @return the tasks to restart
+ */
+ public Set<ExecutionVertexID> getVerticesToRestart() {
+ if (canRestart()) {
+ return verticesToRestart;
+ } else {
+ throw new IllegalStateException("Cannot get vertices to restart when the restarting is suppressed.");
+ }
+ }
+
+ /**
+ * Returns the delay before the restarting.
+ *
+ * @return the delay before the restarting
+ */
+ public long getRestartDelayMS() {
+ if (canRestart()) {
+ return restartDelayMS;
+ } else {
+ throw new IllegalStateException("Cannot get restart delay when the restarting is suppressed.");
+ }
+ }
+
+ /**
+ * Returns reason why the restarting cannot be conducted.
+ *
+ * @return reason why the restarting cannot be conducted
+ */
+ public Throwable getError() {
+ if (canRestart()) {
+ throw new IllegalStateException("Cannot get error when the restarting is accepted.");
+ } else {
+ return error;
+ }
+ }
+
+ /**
+ * Returns whether the restarting can be conducted.
+ *
+ * @return whether the restarting can be conducted
+ */
+ public boolean canRestart() {
+ return error == null;
+ }
+
+ /**
+ * Creates a result of a set of tasks to restart to recover from the failure.
+ *
+ * @param verticesToRestart containing task vertices to restart to recover from the failure
+ * @param restartDelayMS indicate a delay before conducting the restart
+ * @return result of a set of tasks to restart to recover from the failure
+ */
+ public static FailureHandlingResult restartable(Set<ExecutionVertexID> verticesToRestart, long restartDelayMS) {
+ return new FailureHandlingResult(verticesToRestart, restartDelayMS);
+ }
+
+ /**
+ * Creates a result that the failure is not recoverable and no restarting should be conducted.
+ *
+ * @param error reason why the failure is not recoverable
+ * @return result indicating the failure is not recoverable
+ */
+ public static FailureHandlingResult unrecoverable(Throwable error) {
+ return new FailureHandlingResult(error);
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategy.java
new file mode 100644
index 0000000..cd7a39b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartBackoffTimeStrategy.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+/**
+ * Strategy to decide whether to restart failed tasks and the delay to do the restarting.
+ */
+public interface RestartBackoffTimeStrategy {
+
+ /**
+ * Returns whether a restart should be conducted.
+ *
+ * @return whether a restart should be conducted
+ */
+ boolean canRestart();
+
+ /**
+ * Returns the delay to do the restarting.
+ *
+ * @return the delay to do the restarting
+ */
+ long getBackoffTime();
+
+ /**
+ * Notify the strategy about the task failure cause.
+ *
+ * @param cause of the task failure
+ */
+ void notifyFailure(Throwable cause);
+
+ // ------------------------------------------------------------------------
+ // factory
+ // ------------------------------------------------------------------------
+
+ /**
+ * The factory to instantiate {@link RestartBackoffTimeStrategy}.
+ */
+ interface Factory {
+
+ /**
+ * Instantiates the {@link RestartBackoffTimeStrategy}.
+ *
+ * @return The instantiated restart strategy.
+ */
+ RestartBackoffTimeStrategy create();
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableClassifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableClassifier.java
index 4a17b5f..a08c28f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableClassifier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/throwable/ThrowableClassifier.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.throwable;
+import java.util.Optional;
+
/**
* Helper class, given a exception do the classification.
*/
@@ -33,4 +35,29 @@ public class ThrowableClassifier {
final ThrowableAnnotation annotation = cause.getClass().getAnnotation(ThrowableAnnotation.class);
return annotation == null ? ThrowableType.RecoverableError : annotation.value();
}
+
+ /**
+ * Checks whether a throwable chain contains a specific throwable type and returns the corresponding throwable.
+ *
+ * @param throwable the throwable chain to check.
+ * @param throwableType the throwable type to search for in the chain.
+ * @return Optional throwable of the throwable type if available, otherwise empty
+ */
+ public static Optional<Throwable> findThrowableOfThrowableType(Throwable throwable, ThrowableType throwableType) {
+ if (throwable == null || throwableType == null) {
+ return Optional.empty();
+ }
+
+ Throwable t = throwable;
+ while (t != null) {
+ final ThrowableAnnotation annotation = t.getClass().getAnnotation(ThrowableAnnotation.class);
+ if (annotation != null && annotation.value() == throwableType) {
+ return Optional.of(t);
+ } else {
+ t = t.getCause();
+ }
+ }
+
+ return Optional.empty();
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ThrowableClassifierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ThrowableClassifierTest.java
index 57b330e..517321a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ThrowableClassifierTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ThrowableClassifierTest.java
@@ -27,6 +27,8 @@ import org.apache.flink.util.TestLogger;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
/**
* Test throwable classifier
@@ -68,6 +70,34 @@ public class ThrowableClassifierTest extends TestLogger {
ThrowableClassifier.getThrowableType(new Sub_ThrowableType_PartitionDataMissingError_Exception()));
}
+ @Test
+ public void testFindThrowableOfThrowableType() {
+ // no throwable type
+ assertFalse(ThrowableClassifier.findThrowableOfThrowableType(
+ new Exception(),
+ ThrowableType.RecoverableError).isPresent());
+
+ // no recoverable throwable type
+ assertFalse(ThrowableClassifier.findThrowableOfThrowableType(
+ new ThrowableType_PartitionDataMissingError_Exception(),
+ ThrowableType.RecoverableError).isPresent());
+
+ // direct recoverable throwable
+ assertTrue(ThrowableClassifier.findThrowableOfThrowableType(
+ new ThrowableType_RecoverableFailure_Exception(),
+ ThrowableType.RecoverableError).isPresent());
+
+ // nested recoverable throwable
+ assertTrue(ThrowableClassifier.findThrowableOfThrowableType(
+ new Exception(new ThrowableType_RecoverableFailure_Exception()),
+ ThrowableType.RecoverableError).isPresent());
+
+ // inherit recoverable throwable
+ assertTrue(ThrowableClassifier.findThrowableOfThrowableType(
+ new Sub_ThrowableType_RecoverableFailure_Exception(),
+ ThrowableType.RecoverableError).isPresent());
+ }
+
@ThrowableAnnotation(ThrowableType.PartitionDataMissingError)
private class ThrowableType_PartitionDataMissingError_Exception extends Exception {
}
@@ -82,4 +112,7 @@ public class ThrowableClassifierTest extends TestLogger {
private class Sub_ThrowableType_PartitionDataMissingError_Exception extends ThrowableType_PartitionDataMissingError_Exception {
}
+
+ private class Sub_ThrowableType_RecoverableFailure_Exception extends ThrowableType_RecoverableFailure_Exception {
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java
new file mode 100644
index 0000000..ed37496
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link ExecutionFailureHandler}.
+ */
+public class ExecutionFailureHandlerTest extends TestLogger {
+
+ /**
+ * Tests the case that task restarting is accepted.
+ */
+ @Test
+ public void testNormalFailureHandling() {
+ // failover strategy which always suggests restarting the given tasks
+ Set<ExecutionVertexID> tasksToRestart = new HashSet<>();
+ tasksToRestart.add(new ExecutionVertexID(new JobVertexID(), 0));
+ FailoverStrategy failoverStrategy = new TestFailoverStrategy(tasksToRestart);
+
+ // restart strategy which accepts restarting
+ boolean canRestart = true;
+ long restartDelayMs = 1234;
+ RestartBackoffTimeStrategy restartStrategy = new TestRestartBackoffTimeStrategy(canRestart, restartDelayMs);
+ ExecutionFailureHandler executionFailureHandler = new ExecutionFailureHandler(failoverStrategy, restartStrategy);
+
+ // trigger a task failure
+ FailureHandlingResult result = executionFailureHandler.getFailureHandlingResult(
+ new ExecutionVertexID(new JobVertexID(), 0),
+ new Exception("test failure"));
+
+ // verify results
+ assertTrue(result.canRestart());
+ assertEquals(restartDelayMs, result.getRestartDelayMS());
+ assertEquals(tasksToRestart, result.getVerticesToRestart());
+ try {
+ result.getError();
+ fail("Cannot get error when the restarting is accepted");
+ } catch (IllegalStateException ex) {
+ // expected
+ }
+ }
+
+ /**
+ * Tests the case that task restarting is suppressed.
+ */
+ @Test
+ public void testRestartingSuppressedFailureHandlingResult() {
+ // failover strategy which always suggests restarting the given tasks
+ Set<ExecutionVertexID> tasksToRestart = new HashSet<>();
+ tasksToRestart.add(new ExecutionVertexID(new JobVertexID(), 0));
+ FailoverStrategy failoverStrategy = new TestFailoverStrategy(tasksToRestart);
+
+ // restart strategy which suppresses restarting
+ boolean canRestart = false;
+ long restartDelayMs = 1234;
+ RestartBackoffTimeStrategy restartStrategy = new TestRestartBackoffTimeStrategy(canRestart, restartDelayMs);
+ ExecutionFailureHandler executionFailureHandler = new ExecutionFailureHandler(failoverStrategy, restartStrategy);
+
+ // trigger a task failure
+ FailureHandlingResult result = executionFailureHandler.getFailureHandlingResult(
+ new ExecutionVertexID(new JobVertexID(), 0),
+ new Exception("test failure"));
+
+ // verify results
+ assertFalse(result.canRestart());
+ assertNotNull(result.getError());
+ assertFalse(ExecutionFailureHandler.isUnrecoverableError(result.getError()));
+ try {
+ result.getVerticesToRestart();
+ fail("get tasks to restart is not allowed when restarting is suppressed");
+ } catch (IllegalStateException ex) {
+ // expected
+ }
+ try {
+ result.getRestartDelayMS();
+ fail("get restart delay is not allowed when restarting is suppressed");
+ } catch (IllegalStateException ex) {
+ // expected
+ }
+ }
+
+ /**
+ * Tests the case that the failure is non-recoverable type.
+ */
+ @Test
+ public void testNonRecoverableFailureHandlingResult() {
+ // failover strategy which always suggests restarting the given tasks
+ Set<ExecutionVertexID> tasksToRestart = new HashSet<>();
+ tasksToRestart.add(new ExecutionVertexID(new JobVertexID(), 0));
+ FailoverStrategy failoverStrategy = new TestFailoverStrategy(tasksToRestart);
+
+ // restart strategy which accepts restarting
+ boolean canRestart = true;
+ long restartDelayMs = 1234;
+ RestartBackoffTimeStrategy restartStrategy = new TestRestartBackoffTimeStrategy(canRestart, restartDelayMs);
+ ExecutionFailureHandler executionFailureHandler = new ExecutionFailureHandler(failoverStrategy, restartStrategy);
+
+ // trigger an unrecoverable task failure
+ FailureHandlingResult result = executionFailureHandler.getFailureHandlingResult(
+ new ExecutionVertexID(new JobVertexID(), 0),
+ new Exception(new SuppressRestartsException(new Exception("test failure"))));
+
+ // verify results
+ assertFalse(result.canRestart());
+ assertNotNull(result.getError());
+ assertTrue(ExecutionFailureHandler.isUnrecoverableError(result.getError()));
+ try {
+ result.getVerticesToRestart();
+ fail("get tasks to restart is not allowed when restarting is suppressed");
+ } catch (IllegalStateException ex) {
+ // expected
+ }
+ try {
+ result.getRestartDelayMS();
+ fail("get restart delay is not allowed when restarting is suppressed");
+ } catch (IllegalStateException ex) {
+ // expected
+ }
+ }
+
+ /**
+ * Tests the check for unrecoverable error.
+ */
+ @Test
+ public void testUnrecoverableErrorCheck() {
+ // normal error
+ assertFalse(ExecutionFailureHandler.isUnrecoverableError(new Exception()));
+
+ // direct unrecoverable error
+ assertTrue(ExecutionFailureHandler.isUnrecoverableError(new SuppressRestartsException(new Exception())));
+
+ // nested unrecoverable error
+ assertTrue(ExecutionFailureHandler.isUnrecoverableError(
+ new Exception(new SuppressRestartsException(new Exception()))));
+ }
+
+ // ------------------------------------------------------------------------
+ // utilities
+ // ------------------------------------------------------------------------
+
+ /**
+ * A FailoverStrategy implementation for tests. It always suggest restarting the given task set on construction.
+ */
+ private class TestFailoverStrategy implements FailoverStrategy {
+
+ private final Set<ExecutionVertexID> tasksToRestart;
+
+ public TestFailoverStrategy(Set<ExecutionVertexID> tasksToRestart) {
+ this.tasksToRestart = checkNotNull(tasksToRestart);
+ }
+
+ @Override
+ public Set<ExecutionVertexID> getTasksNeedingRestart(ExecutionVertexID executionVertexId, Throwable cause) {
+ return tasksToRestart;
+ }
+ }
+
+ /**
+ * A RestartBackoffTimeStrategy implementation for tests.
+ */
+ private class TestRestartBackoffTimeStrategy implements RestartBackoffTimeStrategy {
+
+ private final boolean canRestart;
+
+ private final long backoffTime;
+
+ public TestRestartBackoffTimeStrategy(boolean canRestart, long backoffTime) {
+ this.canRestart = canRestart;
+ this.backoffTime = backoffTime;
+ }
+
+ @Override
+ public boolean canRestart() {
+ return canRestart;
+ }
+
+ @Override
+ public long getBackoffTime() {
+ return backoffTime;
+ }
+
+ @Override
+ public void notifyFailure(Throwable cause) {
+ // ignore
+ }
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java
new file mode 100644
index 0000000..8943655
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link FailureHandlingResult}.
+ */
+public class FailureHandlingResultTest extends TestLogger {
+
+ /**
+ * Tests normal FailureHandlingResult.
+ */
+ @Test
+ public void testNormalFailureHandlingResult() {
+ // create a normal FailureHandlingResult
+ Set<ExecutionVertexID> tasks = new HashSet<>();
+ tasks.add(new ExecutionVertexID(new JobVertexID(), 0));
+ long delay = 1234;
+ FailureHandlingResult result = FailureHandlingResult.restartable(tasks, delay);
+
+ assertTrue(result.canRestart());
+ assertEquals(delay, result.getRestartDelayMS());
+ assertEquals(tasks, result.getVerticesToRestart());
+ try {
+ result.getError();
+ fail("Cannot get error when the restarting is accepted");
+ } catch (IllegalStateException ex) {
+ // expected
+ }
+ }
+
+ /**
+ * Tests FailureHandlingResult which suppresses restarts.
+ */
+ @Test
+ public void testRestartingSuppressedFailureHandlingResult() {
+ // create a FailureHandlingResult with error
+ Throwable error = new Exception("test error");
+ FailureHandlingResult result = FailureHandlingResult.unrecoverable(error);
+
+ assertFalse(result.canRestart());
+ assertEquals(error, result.getError());
+ try {
+ result.getVerticesToRestart();
+ fail("get tasks to restart is not allowed when restarting is suppressed");
+ } catch (IllegalStateException ex) {
+ // expected
+ }
+ try {
+ result.getRestartDelayMS();
+ fail("get restart delay is not allowed when restarting is suppressed");
+ } catch (IllegalStateException ex) {
+ // expected
+ }
+ }
+}