You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/11 07:55:47 UTC

[GitHub] [flink] wanglijie95 commented on a diff in pull request #20221: [FLINK-28402] Create FailureHandlingResultSnapshot with the truly failed execution

wanglijie95 commented on code in PR #20221:
URL: https://github.com/apache/flink/pull/20221#discussion_r917641257


##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java:
##########
@@ -71,122 +74,125 @@ public void setUp() {
 
     /** Tests the case that task restarting is accepted. */
     @Test
-    public void testNormalFailureHandling() {
+    void testNormalFailureHandling() throws Exception {
         final Set<ExecutionVertexID> tasksToRestart =
                 Collections.singleton(new ExecutionVertexID(new JobVertexID(), 0));
         failoverStrategy.setTasksToRestart(tasksToRestart);
 
+        Execution execution = createExecution(EXECUTOR_RESOURCE.getExecutor());
         Exception cause = new Exception("test failure");
         long timestamp = System.currentTimeMillis();
         // trigger a task failure
         final FailureHandlingResult result =
-                executionFailureHandler.getFailureHandlingResult(
-                        new ExecutionVertexID(new JobVertexID(), 0), cause, timestamp);
+                executionFailureHandler.getFailureHandlingResult(execution, cause, timestamp);
 
         // verify results
-        assertTrue(result.canRestart());
-        assertEquals(RESTART_DELAY_MS, result.getRestartDelayMS());
-        assertEquals(tasksToRestart, result.getVerticesToRestart());
-        assertThat(result.getError(), is(cause));
-        assertThat(result.getTimestamp(), is(timestamp));
-        assertEquals(1, executionFailureHandler.getNumberOfRestarts());
+        assertThat(result.canRestart()).isTrue();
+        assertThat(result.getFailedExecution().isPresent()).isTrue();

Review Comment:
   `assertThat(result.getFailedExecution().isPresent()).isTrue();` -> `assertThat(result.getFailedExecution()).isPresent();`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java:
##########
@@ -52,38 +52,27 @@ public class FailureHandlingResultSnapshot {
      *
      * @param failureHandlingResult The {@code FailureHandlingResult} that is used for extracting
      *     the failure information.
-     * @param latestExecutionLookup The look-up function for retrieving the latest {@link Execution}
-     *     instance for a given {@link ExecutionVertexID}.
+     * @param currentExecutionsLookup The look-up function for retrieving all the current {@link
+     *     Execution} instance for a given {@link ExecutionVertexID}.

Review Comment:
   instance -> instances



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java:
##########
@@ -71,122 +74,125 @@ public void setUp() {
 
     /** Tests the case that task restarting is accepted. */
     @Test
-    public void testNormalFailureHandling() {
+    void testNormalFailureHandling() throws Exception {
         final Set<ExecutionVertexID> tasksToRestart =
                 Collections.singleton(new ExecutionVertexID(new JobVertexID(), 0));
         failoverStrategy.setTasksToRestart(tasksToRestart);
 
+        Execution execution = createExecution(EXECUTOR_RESOURCE.getExecutor());
         Exception cause = new Exception("test failure");
         long timestamp = System.currentTimeMillis();
         // trigger a task failure
         final FailureHandlingResult result =
-                executionFailureHandler.getFailureHandlingResult(
-                        new ExecutionVertexID(new JobVertexID(), 0), cause, timestamp);
+                executionFailureHandler.getFailureHandlingResult(execution, cause, timestamp);
 
         // verify results
-        assertTrue(result.canRestart());
-        assertEquals(RESTART_DELAY_MS, result.getRestartDelayMS());
-        assertEquals(tasksToRestart, result.getVerticesToRestart());
-        assertThat(result.getError(), is(cause));
-        assertThat(result.getTimestamp(), is(timestamp));
-        assertEquals(1, executionFailureHandler.getNumberOfRestarts());
+        assertThat(result.canRestart()).isTrue();
+        assertThat(result.getFailedExecution().isPresent()).isTrue();
+        assertThat(result.getFailedExecution().get()).isSameAs(execution);
+        assertThat(result.getRestartDelayMS()).isEqualTo(RESTART_DELAY_MS);
+        assertThat(result.getVerticesToRestart()).isEqualTo(tasksToRestart);
+        assertThat(result.getError()).isSameAs(cause);
+        assertThat(result.getTimestamp()).isEqualTo(timestamp);
+        assertThat(executionFailureHandler.getNumberOfRestarts()).isEqualTo(1);
     }
 
     /** Tests the case that task restarting is suppressed. */
     @Test
-    public void testRestartingSuppressedFailureHandlingResult() {
+    void testRestartingSuppressedFailureHandlingResult() throws Exception {
         // restart strategy suppresses restarting
         backoffTimeStrategy.setCanRestart(false);
 
         // trigger a task failure
+        Execution execution = createExecution(EXECUTOR_RESOURCE.getExecutor());
         final Throwable error = new Exception("expected test failure");
         final long timestamp = System.currentTimeMillis();
         final FailureHandlingResult result =
-                executionFailureHandler.getFailureHandlingResult(
-                        new ExecutionVertexID(new JobVertexID(), 0), error, timestamp);
+                executionFailureHandler.getFailureHandlingResult(execution, error, timestamp);
 
         // verify results
-        assertFalse(result.canRestart());
-        assertThat(result.getError(), containsCause(error));
-        assertThat(result.getTimestamp(), is(timestamp));
-        assertFalse(ExecutionFailureHandler.isUnrecoverableError(result.getError()));
-        try {
-            result.getVerticesToRestart();
-            fail("get tasks to restart is not allowed when restarting is suppressed");
-        } catch (IllegalStateException ex) {
-            // expected
-        }
-        try {
-            result.getRestartDelayMS();
-            fail("get restart delay is not allowed when restarting is suppressed");
-        } catch (IllegalStateException ex) {
-            // expected
-        }
-        assertEquals(0, executionFailureHandler.getNumberOfRestarts());
+        assertThat(result.canRestart()).isFalse();
+        assertThat(result.getFailedExecution().isPresent()).isTrue();

Review Comment:
   assertThat(result.getFailedExecution().isPresent()).isTrue(); -> assertThat(result.getFailedExecution()).isPresent();



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java:
##########
@@ -71,122 +74,125 @@ public void setUp() {
 
     /** Tests the case that task restarting is accepted. */
     @Test
-    public void testNormalFailureHandling() {
+    void testNormalFailureHandling() throws Exception {
         final Set<ExecutionVertexID> tasksToRestart =
                 Collections.singleton(new ExecutionVertexID(new JobVertexID(), 0));
         failoverStrategy.setTasksToRestart(tasksToRestart);
 
+        Execution execution = createExecution(EXECUTOR_RESOURCE.getExecutor());
         Exception cause = new Exception("test failure");
         long timestamp = System.currentTimeMillis();
         // trigger a task failure
         final FailureHandlingResult result =
-                executionFailureHandler.getFailureHandlingResult(
-                        new ExecutionVertexID(new JobVertexID(), 0), cause, timestamp);
+                executionFailureHandler.getFailureHandlingResult(execution, cause, timestamp);
 
         // verify results
-        assertTrue(result.canRestart());
-        assertEquals(RESTART_DELAY_MS, result.getRestartDelayMS());
-        assertEquals(tasksToRestart, result.getVerticesToRestart());
-        assertThat(result.getError(), is(cause));
-        assertThat(result.getTimestamp(), is(timestamp));
-        assertEquals(1, executionFailureHandler.getNumberOfRestarts());
+        assertThat(result.canRestart()).isTrue();
+        assertThat(result.getFailedExecution().isPresent()).isTrue();
+        assertThat(result.getFailedExecution().get()).isSameAs(execution);
+        assertThat(result.getRestartDelayMS()).isEqualTo(RESTART_DELAY_MS);
+        assertThat(result.getVerticesToRestart()).isEqualTo(tasksToRestart);
+        assertThat(result.getError()).isSameAs(cause);
+        assertThat(result.getTimestamp()).isEqualTo(timestamp);
+        assertThat(executionFailureHandler.getNumberOfRestarts()).isEqualTo(1);
     }
 
     /** Tests the case that task restarting is suppressed. */
     @Test
-    public void testRestartingSuppressedFailureHandlingResult() {
+    void testRestartingSuppressedFailureHandlingResult() throws Exception {
         // restart strategy suppresses restarting
         backoffTimeStrategy.setCanRestart(false);
 
         // trigger a task failure
+        Execution execution = createExecution(EXECUTOR_RESOURCE.getExecutor());
         final Throwable error = new Exception("expected test failure");
         final long timestamp = System.currentTimeMillis();
         final FailureHandlingResult result =
-                executionFailureHandler.getFailureHandlingResult(
-                        new ExecutionVertexID(new JobVertexID(), 0), error, timestamp);
+                executionFailureHandler.getFailureHandlingResult(execution, error, timestamp);
 
         // verify results
-        assertFalse(result.canRestart());
-        assertThat(result.getError(), containsCause(error));
-        assertThat(result.getTimestamp(), is(timestamp));
-        assertFalse(ExecutionFailureHandler.isUnrecoverableError(result.getError()));
-        try {
-            result.getVerticesToRestart();
-            fail("get tasks to restart is not allowed when restarting is suppressed");
-        } catch (IllegalStateException ex) {
-            // expected
-        }
-        try {
-            result.getRestartDelayMS();
-            fail("get restart delay is not allowed when restarting is suppressed");
-        } catch (IllegalStateException ex) {
-            // expected
-        }
-        assertEquals(0, executionFailureHandler.getNumberOfRestarts());
+        assertThat(result.canRestart()).isFalse();
+        assertThat(result.getFailedExecution().isPresent()).isTrue();
+        assertThat(result.getFailedExecution().get()).isSameAs(execution);
+        assertThat(result.getError()).hasCause(error);
+        assertThat(result.getTimestamp()).isEqualTo(timestamp);
+        assertThat(ExecutionFailureHandler.isUnrecoverableError(result.getError())).isFalse();
+
+        assertThatThrownBy(result::getVerticesToRestart)
+                .as("getVerticesToRestart is not allowed when restarting is suppressed")
+                .isInstanceOf(IllegalStateException.class);
+
+        assertThatThrownBy(result::getRestartDelayMS)
+                .as("getRestartDelayMS is not allowed when restarting is suppressed")
+                .isInstanceOf(IllegalStateException.class);
+
+        assertThat(executionFailureHandler.getNumberOfRestarts()).isZero();
     }
 
     /** Tests the case that the failure is non-recoverable type. */
     @Test
-    public void testNonRecoverableFailureHandlingResult() {
+    void testNonRecoverableFailureHandlingResult() throws Exception {
+
         // trigger an unrecoverable task failure
+        Execution execution = createExecution(EXECUTOR_RESOURCE.getExecutor());
         final Throwable error =
                 new Exception(new SuppressRestartsException(new Exception("test failure")));
         final long timestamp = System.currentTimeMillis();
         final FailureHandlingResult result =
-                executionFailureHandler.getFailureHandlingResult(
-                        new ExecutionVertexID(new JobVertexID(), 0), error, timestamp);
+                executionFailureHandler.getFailureHandlingResult(execution, error, timestamp);
 
         // verify results
-        assertFalse(result.canRestart());
-        assertNotNull(result.getError());
-        assertTrue(ExecutionFailureHandler.isUnrecoverableError(result.getError()));
-        assertThat(result.getTimestamp(), is(timestamp));
-        try {
-            result.getVerticesToRestart();
-            fail("get tasks to restart is not allowed when restarting is suppressed");
-        } catch (IllegalStateException ex) {
-            // expected
-        }
-        try {
-            result.getRestartDelayMS();
-            fail("get restart delay is not allowed when restarting is suppressed");
-        } catch (IllegalStateException ex) {
-            // expected
-        }
-        assertEquals(0, executionFailureHandler.getNumberOfRestarts());
+        assertThat(result.canRestart()).isFalse();
+        assertThat(result.getFailedExecution().isPresent()).isTrue();

Review Comment:
   assertThat(result.getFailedExecution().isPresent()).isTrue(); -> assertThat(result.getFailedExecution()).isPresent();



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org