You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/08 09:23:44 UTC

[GitHub] [flink] zhuzhurk opened a new pull request, #20221: [FLINK-28402] Create FailureHandlingResultSnapshot with the truly failed execution

zhuzhurk opened a new pull request, #20221:
URL: https://github.com/apache/flink/pull/20221

   ## What is the purpose of the change
   
   Previously, FailureHandlingResultSnapshot was always created to treat the only current attempt of an execution vertex as the failed execution. This is no longer right in speculative execution cases, in which an execution vertex can have multiple current executions, and any of them may fail.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on pull request #20221: [FLINK-28402] Create FailureHandlingResultSnapshot with the truly failed execution

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on PR #20221:
URL: https://github.com/apache/flink/pull/20221#issuecomment-1180182443

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on pull request #20221: [FLINK-28402] Create FailureHandlingResultSnapshot with the truly failed execution

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on PR #20221:
URL: https://github.com/apache/flink/pull/20221#issuecomment-1180241596

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on pull request #20221: [FLINK-28402] Create FailureHandlingResultSnapshot with the truly failed execution

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on PR #20221:
URL: https://github.com/apache/flink/pull/20221#issuecomment-1180611626

   Thanks for reviewing! @wanglijie95 
   Merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on pull request #20221: [FLINK-28402] Create FailureHandlingResultSnapshot with the truly failed execution

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on PR #20221:
URL: https://github.com/apache/flink/pull/20221#issuecomment-1179457223

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on pull request #20221: [FLINK-28402] Create FailureHandlingResultSnapshot with the truly failed execution

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on PR #20221:
URL: https://github.com/apache/flink/pull/20221#issuecomment-1179744012

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on pull request #20221: [FLINK-28402] Create FailureHandlingResultSnapshot with the truly failed execution

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on PR #20221:
URL: https://github.com/apache/flink/pull/20221#issuecomment-1180561038

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20221: [FLINK-28402] Create FailureHandlingResultSnapshot with the truly failed execution

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20221:
URL: https://github.com/apache/flink/pull/20221#issuecomment-1178767176

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "61eee34c59becc28c5c309e1395a533a76304f81",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "61eee34c59becc28c5c309e1395a533a76304f81",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 61eee34c59becc28c5c309e1395a533a76304f81 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk closed pull request #20221: [FLINK-28402] Create FailureHandlingResultSnapshot with the truly failed execution

Posted by GitBox <gi...@apache.org>.
zhuzhurk closed pull request #20221: [FLINK-28402] Create FailureHandlingResultSnapshot with the truly failed execution
URL: https://github.com/apache/flink/pull/20221


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on pull request #20221: [FLINK-28402] Create FailureHandlingResultSnapshot with the truly failed execution

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on PR #20221:
URL: https://github.com/apache/flink/pull/20221#issuecomment-1180474121

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wanglijie95 commented on a diff in pull request #20221: [FLINK-28402] Create FailureHandlingResultSnapshot with the truly failed execution

Posted by GitBox <gi...@apache.org>.
wanglijie95 commented on code in PR #20221:
URL: https://github.com/apache/flink/pull/20221#discussion_r917641257


##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java:
##########
@@ -71,122 +74,125 @@ public void setUp() {
 
     /** Tests the case that task restarting is accepted. */
     @Test
-    public void testNormalFailureHandling() {
+    void testNormalFailureHandling() throws Exception {
         final Set<ExecutionVertexID> tasksToRestart =
                 Collections.singleton(new ExecutionVertexID(new JobVertexID(), 0));
         failoverStrategy.setTasksToRestart(tasksToRestart);
 
+        Execution execution = createExecution(EXECUTOR_RESOURCE.getExecutor());
         Exception cause = new Exception("test failure");
         long timestamp = System.currentTimeMillis();
         // trigger a task failure
         final FailureHandlingResult result =
-                executionFailureHandler.getFailureHandlingResult(
-                        new ExecutionVertexID(new JobVertexID(), 0), cause, timestamp);
+                executionFailureHandler.getFailureHandlingResult(execution, cause, timestamp);
 
         // verify results
-        assertTrue(result.canRestart());
-        assertEquals(RESTART_DELAY_MS, result.getRestartDelayMS());
-        assertEquals(tasksToRestart, result.getVerticesToRestart());
-        assertThat(result.getError(), is(cause));
-        assertThat(result.getTimestamp(), is(timestamp));
-        assertEquals(1, executionFailureHandler.getNumberOfRestarts());
+        assertThat(result.canRestart()).isTrue();
+        assertThat(result.getFailedExecution().isPresent()).isTrue();

Review Comment:
   `assertThat(result.getFailedExecution().isPresent()).isTrue();` -> `assertThat(result.getFailedExecution()).isPresent();`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/FailureHandlingResultSnapshot.java:
##########
@@ -52,38 +52,27 @@ public class FailureHandlingResultSnapshot {
      *
      * @param failureHandlingResult The {@code FailureHandlingResult} that is used for extracting
      *     the failure information.
-     * @param latestExecutionLookup The look-up function for retrieving the latest {@link Execution}
-     *     instance for a given {@link ExecutionVertexID}.
+     * @param currentExecutionsLookup The look-up function for retrieving all the current {@link
+     *     Execution} instance for a given {@link ExecutionVertexID}.

Review Comment:
   instance -> instances



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java:
##########
@@ -71,122 +74,125 @@ public void setUp() {
 
     /** Tests the case that task restarting is accepted. */
     @Test
-    public void testNormalFailureHandling() {
+    void testNormalFailureHandling() throws Exception {
         final Set<ExecutionVertexID> tasksToRestart =
                 Collections.singleton(new ExecutionVertexID(new JobVertexID(), 0));
         failoverStrategy.setTasksToRestart(tasksToRestart);
 
+        Execution execution = createExecution(EXECUTOR_RESOURCE.getExecutor());
         Exception cause = new Exception("test failure");
         long timestamp = System.currentTimeMillis();
         // trigger a task failure
         final FailureHandlingResult result =
-                executionFailureHandler.getFailureHandlingResult(
-                        new ExecutionVertexID(new JobVertexID(), 0), cause, timestamp);
+                executionFailureHandler.getFailureHandlingResult(execution, cause, timestamp);
 
         // verify results
-        assertTrue(result.canRestart());
-        assertEquals(RESTART_DELAY_MS, result.getRestartDelayMS());
-        assertEquals(tasksToRestart, result.getVerticesToRestart());
-        assertThat(result.getError(), is(cause));
-        assertThat(result.getTimestamp(), is(timestamp));
-        assertEquals(1, executionFailureHandler.getNumberOfRestarts());
+        assertThat(result.canRestart()).isTrue();
+        assertThat(result.getFailedExecution().isPresent()).isTrue();
+        assertThat(result.getFailedExecution().get()).isSameAs(execution);
+        assertThat(result.getRestartDelayMS()).isEqualTo(RESTART_DELAY_MS);
+        assertThat(result.getVerticesToRestart()).isEqualTo(tasksToRestart);
+        assertThat(result.getError()).isSameAs(cause);
+        assertThat(result.getTimestamp()).isEqualTo(timestamp);
+        assertThat(executionFailureHandler.getNumberOfRestarts()).isEqualTo(1);
     }
 
     /** Tests the case that task restarting is suppressed. */
     @Test
-    public void testRestartingSuppressedFailureHandlingResult() {
+    void testRestartingSuppressedFailureHandlingResult() throws Exception {
         // restart strategy suppresses restarting
         backoffTimeStrategy.setCanRestart(false);
 
         // trigger a task failure
+        Execution execution = createExecution(EXECUTOR_RESOURCE.getExecutor());
         final Throwable error = new Exception("expected test failure");
         final long timestamp = System.currentTimeMillis();
         final FailureHandlingResult result =
-                executionFailureHandler.getFailureHandlingResult(
-                        new ExecutionVertexID(new JobVertexID(), 0), error, timestamp);
+                executionFailureHandler.getFailureHandlingResult(execution, error, timestamp);
 
         // verify results
-        assertFalse(result.canRestart());
-        assertThat(result.getError(), containsCause(error));
-        assertThat(result.getTimestamp(), is(timestamp));
-        assertFalse(ExecutionFailureHandler.isUnrecoverableError(result.getError()));
-        try {
-            result.getVerticesToRestart();
-            fail("get tasks to restart is not allowed when restarting is suppressed");
-        } catch (IllegalStateException ex) {
-            // expected
-        }
-        try {
-            result.getRestartDelayMS();
-            fail("get restart delay is not allowed when restarting is suppressed");
-        } catch (IllegalStateException ex) {
-            // expected
-        }
-        assertEquals(0, executionFailureHandler.getNumberOfRestarts());
+        assertThat(result.canRestart()).isFalse();
+        assertThat(result.getFailedExecution().isPresent()).isTrue();

Review Comment:
   assertThat(result.getFailedExecution().isPresent()).isTrue(); -> assertThat(result.getFailedExecution()).isPresent();



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java:
##########
@@ -71,122 +74,125 @@ public void setUp() {
 
     /** Tests the case that task restarting is accepted. */
     @Test
-    public void testNormalFailureHandling() {
+    void testNormalFailureHandling() throws Exception {
         final Set<ExecutionVertexID> tasksToRestart =
                 Collections.singleton(new ExecutionVertexID(new JobVertexID(), 0));
         failoverStrategy.setTasksToRestart(tasksToRestart);
 
+        Execution execution = createExecution(EXECUTOR_RESOURCE.getExecutor());
         Exception cause = new Exception("test failure");
         long timestamp = System.currentTimeMillis();
         // trigger a task failure
         final FailureHandlingResult result =
-                executionFailureHandler.getFailureHandlingResult(
-                        new ExecutionVertexID(new JobVertexID(), 0), cause, timestamp);
+                executionFailureHandler.getFailureHandlingResult(execution, cause, timestamp);
 
         // verify results
-        assertTrue(result.canRestart());
-        assertEquals(RESTART_DELAY_MS, result.getRestartDelayMS());
-        assertEquals(tasksToRestart, result.getVerticesToRestart());
-        assertThat(result.getError(), is(cause));
-        assertThat(result.getTimestamp(), is(timestamp));
-        assertEquals(1, executionFailureHandler.getNumberOfRestarts());
+        assertThat(result.canRestart()).isTrue();
+        assertThat(result.getFailedExecution().isPresent()).isTrue();
+        assertThat(result.getFailedExecution().get()).isSameAs(execution);
+        assertThat(result.getRestartDelayMS()).isEqualTo(RESTART_DELAY_MS);
+        assertThat(result.getVerticesToRestart()).isEqualTo(tasksToRestart);
+        assertThat(result.getError()).isSameAs(cause);
+        assertThat(result.getTimestamp()).isEqualTo(timestamp);
+        assertThat(executionFailureHandler.getNumberOfRestarts()).isEqualTo(1);
     }
 
     /** Tests the case that task restarting is suppressed. */
     @Test
-    public void testRestartingSuppressedFailureHandlingResult() {
+    void testRestartingSuppressedFailureHandlingResult() throws Exception {
         // restart strategy suppresses restarting
         backoffTimeStrategy.setCanRestart(false);
 
         // trigger a task failure
+        Execution execution = createExecution(EXECUTOR_RESOURCE.getExecutor());
         final Throwable error = new Exception("expected test failure");
         final long timestamp = System.currentTimeMillis();
         final FailureHandlingResult result =
-                executionFailureHandler.getFailureHandlingResult(
-                        new ExecutionVertexID(new JobVertexID(), 0), error, timestamp);
+                executionFailureHandler.getFailureHandlingResult(execution, error, timestamp);
 
         // verify results
-        assertFalse(result.canRestart());
-        assertThat(result.getError(), containsCause(error));
-        assertThat(result.getTimestamp(), is(timestamp));
-        assertFalse(ExecutionFailureHandler.isUnrecoverableError(result.getError()));
-        try {
-            result.getVerticesToRestart();
-            fail("get tasks to restart is not allowed when restarting is suppressed");
-        } catch (IllegalStateException ex) {
-            // expected
-        }
-        try {
-            result.getRestartDelayMS();
-            fail("get restart delay is not allowed when restarting is suppressed");
-        } catch (IllegalStateException ex) {
-            // expected
-        }
-        assertEquals(0, executionFailureHandler.getNumberOfRestarts());
+        assertThat(result.canRestart()).isFalse();
+        assertThat(result.getFailedExecution().isPresent()).isTrue();
+        assertThat(result.getFailedExecution().get()).isSameAs(execution);
+        assertThat(result.getError()).hasCause(error);
+        assertThat(result.getTimestamp()).isEqualTo(timestamp);
+        assertThat(ExecutionFailureHandler.isUnrecoverableError(result.getError())).isFalse();
+
+        assertThatThrownBy(result::getVerticesToRestart)
+                .as("getVerticesToRestart is not allowed when restarting is suppressed")
+                .isInstanceOf(IllegalStateException.class);
+
+        assertThatThrownBy(result::getRestartDelayMS)
+                .as("getRestartDelayMS is not allowed when restarting is suppressed")
+                .isInstanceOf(IllegalStateException.class);
+
+        assertThat(executionFailureHandler.getNumberOfRestarts()).isZero();
     }
 
     /** Tests the case that the failure is non-recoverable type. */
     @Test
-    public void testNonRecoverableFailureHandlingResult() {
+    void testNonRecoverableFailureHandlingResult() throws Exception {
+
         // trigger an unrecoverable task failure
+        Execution execution = createExecution(EXECUTOR_RESOURCE.getExecutor());
         final Throwable error =
                 new Exception(new SuppressRestartsException(new Exception("test failure")));
         final long timestamp = System.currentTimeMillis();
         final FailureHandlingResult result =
-                executionFailureHandler.getFailureHandlingResult(
-                        new ExecutionVertexID(new JobVertexID(), 0), error, timestamp);
+                executionFailureHandler.getFailureHandlingResult(execution, error, timestamp);
 
         // verify results
-        assertFalse(result.canRestart());
-        assertNotNull(result.getError());
-        assertTrue(ExecutionFailureHandler.isUnrecoverableError(result.getError()));
-        assertThat(result.getTimestamp(), is(timestamp));
-        try {
-            result.getVerticesToRestart();
-            fail("get tasks to restart is not allowed when restarting is suppressed");
-        } catch (IllegalStateException ex) {
-            // expected
-        }
-        try {
-            result.getRestartDelayMS();
-            fail("get restart delay is not allowed when restarting is suppressed");
-        } catch (IllegalStateException ex) {
-            // expected
-        }
-        assertEquals(0, executionFailureHandler.getNumberOfRestarts());
+        assertThat(result.canRestart()).isFalse();
+        assertThat(result.getFailedExecution().isPresent()).isTrue();

Review Comment:
   assertThat(result.getFailedExecution().isPresent()).isTrue(); -> assertThat(result.getFailedExecution()).isPresent();



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org