You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2024/03/28 16:46:10 UTC

(flink) branch release-1.18 updated (94d1363c27e -> 20c506d76c9)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 94d1363c27e [FLINK-34933][test] Fixes JobMasterServiceLeadershipRunnerTest#testResultFutureCompletionOfOutdatedLeaderIsIgnored
     new fcb581f0039 [FLINK-34922][rest] Support concurrent global failure
     new 20c506d76c9 [FLINK-34922] Adds ITCase for GlobalFailureOnRestart

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../rest/handler/job/JobExceptionsHandler.java     |  12 ++
 .../rest/handler/job/JobExceptionsHandlerTest.java |  53 +++++-
 .../test/scheduling/AdaptiveSchedulerITCase.java   | 197 +++++++++++++++++++--
 3 files changed, 247 insertions(+), 15 deletions(-)


(flink) 02/02: [FLINK-34922] Adds ITCase for GlobalFailureOnRestart

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 20c506d76c99c2e6c3f30a039acd0366d3448c87
Author: Panagiotis Garefalakis <pg...@confluent.io>
AuthorDate: Wed Mar 27 22:23:48 2024 -0700

    [FLINK-34922] Adds ITCase for GlobalFailureOnRestart
    
    Add an ITCase where a global failure is triggered while the scheduler is restarting, and asserts that
    this failure is handled such that can be retrieved via the REST API.
---
 .../test/scheduling/AdaptiveSchedulerITCase.java   | 197 +++++++++++++++++++--
 1 file changed, 183 insertions(+), 14 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
index a4124dfe08c..d15f3ae7ef4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.test.scheduling;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
@@ -30,9 +33,20 @@ import org.apache.flink.configuration.HeartbeatManagerOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
 import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
+import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory.RootExceptionInfo;
 import org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters;
 import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -48,8 +62,10 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -62,11 +78,13 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
 import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
 import static org.apache.flink.util.ExceptionUtils.assertThrowable;
 import static org.hamcrest.CoreMatchers.containsString;
@@ -266,25 +284,78 @@ public class AdaptiveSchedulerITCase extends TestLogger {
         final JobClient jobClient = env.executeAsync();
         CommonTestUtils.waitUntilCondition(
                 () -> {
-                    final RestClusterClient<?> restClusterClient =
-                            MINI_CLUSTER_WITH_CLIENT_RESOURCE.getRestClusterClient();
-                    final JobExceptionsMessageParameters params =
-                            new JobExceptionsMessageParameters();
-                    params.jobPathParameter.resolve(jobClient.getJobID());
-                    final CompletableFuture<JobExceptionsInfoWithHistory> exceptionsFuture =
-                            restClusterClient.sendRequest(
-                                    JobExceptionsHeaders.getInstance(),
-                                    params,
-                                    EmptyRequestBody.getInstance());
-                    final JobExceptionsInfoWithHistory jobExceptionsInfoWithHistory =
-                            exceptionsFuture.get();
-                    return jobExceptionsInfoWithHistory.getExceptionHistory().getEntries().size()
-                            > 0;
+                    final List<RootExceptionInfo> exceptions =
+                            getJobExceptions(
+                                            jobClient.getJobID(), MINI_CLUSTER_WITH_CLIENT_RESOURCE)
+                                    .get()
+                                    .getExceptionHistory()
+                                    .getEntries();
+                    return !exceptions.isEmpty();
                 });
         jobClient.cancel().get();
         CommonTestUtils.waitForJobStatus(jobClient, Collections.singletonList(JobStatus.CANCELED));
     }
 
+    @Test
+    public void testGlobalFailureOnRestart() throws Exception {
+        final MiniCluster miniCluster = MINI_CLUSTER_WITH_CLIENT_RESOURCE.getMiniCluster();
+
+        final JobVertexID jobVertexId = new JobVertexID();
+        final JobVertex jobVertex = new JobVertex("jobVertex", jobVertexId);
+        jobVertex.setInvokableClass(FailingInvokable.class);
+        jobVertex.addOperatorCoordinator(
+                new SerializedValue<>(
+                        new FailingCoordinatorProvider(OperatorID.fromJobVertexID(jobVertexId))));
+        jobVertex.setParallelism(1);
+
+        final ExecutionConfig executionConfig = new ExecutionConfig();
+        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.hours(1)));
+
+        final JobGraph jobGraph =
+                JobGraphBuilder.newStreamingJobGraphBuilder()
+                        .addJobVertices(Collections.singletonList(jobVertex))
+                        .setExecutionConfig(executionConfig)
+                        .build();
+        miniCluster.submitJob(jobGraph).join();
+
+        // We rely on waiting in restarting state (see the restart strategy above)
+        CommonTestUtils.waitUntilCondition(
+                () -> miniCluster.getJobStatus(jobGraph.getJobID()).join() == JobStatus.RESTARTING);
+        FailingCoordinatorProvider.JOB_RESTARTING.countDown();
+
+        assertThatFuture(getJobExceptions(jobGraph.getJobID(), MINI_CLUSTER_WITH_CLIENT_RESOURCE))
+                .eventuallySucceeds();
+
+        miniCluster.cancelJob(jobGraph.getJobID());
+        CommonTestUtils.waitUntilCondition(
+                () -> miniCluster.getJobStatus(jobGraph.getJobID()).join() == JobStatus.CANCELED);
+
+        final JobExceptionsInfoWithHistory jobExceptions =
+                getJobExceptions(jobGraph.getJobID(), MINI_CLUSTER_WITH_CLIENT_RESOURCE).get();
+
+        // there should be exactly 1 root exception in the history from the failing vertex,
+        // as the global coordinator failure should be treated as a concurrent exception
+        Assertions.assertThat(jobExceptions.getExceptionHistory().getEntries())
+                .hasSize(1)
+                .allSatisfy(
+                        rootExceptionInfo ->
+                                Assertions.assertThat(rootExceptionInfo.getStacktrace())
+                                        .contains(FailingInvokable.localExceptionMsg)
+                                        .doesNotContain(
+                                                FailingCoordinatorProvider.globalExceptionMsg))
+                .allSatisfy(
+                        rootExceptionInfo ->
+                                Assertions.assertThat(rootExceptionInfo.getConcurrentExceptions())
+                                        .anySatisfy(
+                                                exceptionInfo ->
+                                                        Assertions.assertThat(
+                                                                        exceptionInfo
+                                                                                .getStacktrace())
+                                                                .contains(
+                                                                        FailingCoordinatorProvider
+                                                                                .globalExceptionMsg)));
+    }
+
     private boolean isDirectoryEmpty(File directory) {
         File[] files = directory.listFiles();
         if (files.length > 0) {
@@ -306,6 +377,104 @@ public class AdaptiveSchedulerITCase extends TestLogger {
         return env;
     }
 
+    private static CompletableFuture<JobExceptionsInfoWithHistory> getJobExceptions(
+            JobID jobId, MiniClusterWithClientResource minClusterRes) throws Exception {
+        final RestClusterClient<?> restClusterClient = minClusterRes.getRestClusterClient();
+        final JobExceptionsMessageParameters params = new JobExceptionsMessageParameters();
+        params.jobPathParameter.resolve(jobId);
+        return restClusterClient.sendRequest(
+                JobExceptionsHeaders.getInstance(), params, EmptyRequestBody.getInstance());
+    }
+
+    /** Simple invokable which fails immediately after being invoked. */
+    public static class FailingInvokable extends AbstractInvokable {
+        private static final String localExceptionMsg = "Local exception.";
+
+        public FailingInvokable(Environment environment) {
+            super(environment);
+        }
+
+        @Override
+        public void invoke() throws Exception {
+            throw new Exception(localExceptionMsg);
+        }
+    }
+
+    private static class FailingCoordinatorProvider implements OperatorCoordinator.Provider {
+
+        private static final CountDownLatch JOB_RESTARTING = new CountDownLatch(1);
+
+        private final OperatorID operatorId;
+        private static final String globalExceptionMsg = "Global exception.";
+
+        FailingCoordinatorProvider(OperatorID operatorId) {
+            this.operatorId = operatorId;
+        }
+
+        @Override
+        public OperatorID getOperatorId() {
+            return operatorId;
+        }
+
+        @Override
+        public OperatorCoordinator create(OperatorCoordinator.Context context) {
+            return new OperatorCoordinator() {
+
+                @Nullable private Thread thread;
+
+                @Override
+                public void start() {
+                    thread =
+                            new Thread(
+                                    () -> {
+                                        try {
+                                            JOB_RESTARTING.await();
+                                            context.failJob(new Exception(globalExceptionMsg));
+                                        } catch (InterruptedException e) {
+                                            Thread.currentThread().interrupt();
+                                        }
+                                    });
+                    thread.setName(AdaptiveSchedulerITCase.class + "_failing-coordinator");
+                    thread.setDaemon(true);
+                    thread.start();
+                }
+
+                @Override
+                public void close() throws Exception {
+                    if (thread != null) {
+                        thread.interrupt();
+                        thread.join();
+                    }
+                }
+
+                @Override
+                public void handleEventFromOperator(
+                        int subtask, int attemptNumber, OperatorEvent event) {}
+
+                @Override
+                public void checkpointCoordinator(
+                        long checkpointId, CompletableFuture<byte[]> resultFuture) {}
+
+                @Override
+                public void notifyCheckpointComplete(long checkpointId) {}
+
+                @Override
+                public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) {}
+
+                @Override
+                public void subtaskReset(int subtask, long checkpointId) {}
+
+                @Override
+                public void executionAttemptFailed(
+                        int subtask, int attemptNumber, @Nullable Throwable reason) {}
+
+                @Override
+                public void executionAttemptReady(
+                        int subtask, int attemptNumber, SubtaskGateway gateway) {}
+            };
+        }
+    }
+
     private static final class DummySource extends RichParallelSourceFunction<Integer>
             implements CheckpointedFunction, CheckpointListener {
         private final StopWithSavepointTestBehavior behavior;


(flink) 01/02: [FLINK-34922][rest] Support concurrent global failure

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fcb581f0039f9704b6eaf15a2fabaa4e05d79048
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Mar 27 09:33:08 2024 +0100

    [FLINK-34922][rest] Support concurrent global failure
---
 .../rest/handler/job/JobExceptionsHandler.java     | 12 +++++
 .../rest/handler/job/JobExceptionsHandlerTest.java | 53 +++++++++++++++++++++-
 2 files changed, 64 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
index 5ece82a2671..84140c8c007 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -248,6 +248,18 @@ public class JobExceptionsHandler
 
     private static JobExceptionsInfoWithHistory.ExceptionInfo createExceptionInfo(
             ExceptionHistoryEntry exceptionHistoryEntry) {
+
+        if (exceptionHistoryEntry.isGlobal()) {
+            return new JobExceptionsInfoWithHistory.ExceptionInfo(
+                    exceptionHistoryEntry.getException().getOriginalErrorClassName(),
+                    exceptionHistoryEntry.getExceptionAsString(),
+                    exceptionHistoryEntry.getTimestamp(),
+                    exceptionHistoryEntry.getFailureLabels(),
+                    null,
+                    null,
+                    null);
+        }
+
         assertLocalExceptionInfo(exceptionHistoryEntry);
 
         return new JobExceptionsInfoWithHistory.ExceptionInfo(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
index efce7903686..c7699c6f951 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
@@ -55,6 +55,7 @@ import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.Executors;
 
+import org.assertj.core.api.Assertions;
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
 import org.hamcrest.TypeSafeDiagnosingMatcher;
@@ -64,6 +65,7 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -73,6 +75,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
+import static org.assertj.core.api.HamcrestCondition.matching;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.collection.IsEmptyCollection.empty;
@@ -214,6 +217,47 @@ public class JobExceptionsHandlerTest extends TestLogger {
         assertFalse(response.getExceptionHistory().isTruncated());
     }
 
+    @Test
+    public void testWithExceptionHistoryAndConcurrentGlobalFailure()
+            throws HandlerRequestException, ExecutionException, InterruptedException {
+        final ExceptionHistoryEntry otherFailure =
+                ExceptionHistoryEntry.createGlobal(
+                        new RuntimeException("exception #1"),
+                        CompletableFuture.completedFuture(Collections.emptyMap()));
+        final RootExceptionHistoryEntry rootCause =
+                fromGlobalFailure(
+                        new RuntimeException("exception #0"),
+                        System.currentTimeMillis(),
+                        Collections.singleton(otherFailure));
+
+        final ExecutionGraphInfo executionGraphInfo = createExecutionGraphInfo(rootCause);
+        final HandlerRequest<EmptyRequestBody> request =
+                createRequest(executionGraphInfo.getJobId(), 10);
+        final JobExceptionsInfoWithHistory response =
+                testInstance.handleRequest(request, executionGraphInfo);
+
+        Assertions.assertThat(response.getExceptionHistory().getEntries())
+                .hasSize(1)
+                .satisfies(
+                        matching(
+                                contains(
+                                        historyContainsGlobalFailure(
+                                                rootCause.getException(),
+                                                rootCause.getTimestamp(),
+                                                matchesFailure(
+                                                        otherFailure.getException(),
+                                                        otherFailure.getTimestamp(),
+                                                        otherFailure.getFailureLabelsFuture(),
+                                                        otherFailure.getFailingTaskName(),
+                                                        JobExceptionsHandler.toString(
+                                                                otherFailure
+                                                                        .getTaskManagerLocation()),
+                                                        JobExceptionsHandler.toTaskManagerId(
+                                                                otherFailure
+                                                                        .getTaskManagerLocation()))))));
+        Assertions.assertThat(response.getExceptionHistory().isTruncated()).isFalse();
+    }
+
     @Test
     public void testWithExceptionHistoryWithMatchingFailureLabel()
             throws HandlerRequestException, ExecutionException, InterruptedException {
@@ -532,13 +576,20 @@ public class JobExceptionsHandlerTest extends TestLogger {
     }
 
     private static RootExceptionHistoryEntry fromGlobalFailure(Throwable cause, long timestamp) {
+        return fromGlobalFailure(cause, timestamp, Collections.emptySet());
+    }
+
+    private static RootExceptionHistoryEntry fromGlobalFailure(
+            Throwable cause,
+            long timestamp,
+            Collection<ExceptionHistoryEntry> concurrentExceptions) {
         return new RootExceptionHistoryEntry(
                 cause,
                 timestamp,
                 FailureEnricherUtils.EMPTY_FAILURE_LABELS,
                 null,
                 null,
-                Collections.emptySet());
+                concurrentExceptions);
     }
 
     // -------- factory methods for instantiating new Matchers --------