You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2022/01/29 15:40:41 UTC

[flink] 01/02: [FLINK-25486][Runtime/Coordination] Fix the bug that flink will lost state when zookeeper leader changes

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8ba13f37afb9164f3bb17de78c4b0d85b1633638
Author: liujiangang <li...@kuaishou.com>
AuthorDate: Fri Jan 7 18:28:15 2022 +0800

    [FLINK-25486][Runtime/Coordination] Fix the bug that flink will lost state when zookeeper leader changes
    
    This closes #18296.
---
 .../flink/runtime/dispatcher/MiniDispatcher.java   |  21 ++-
 .../runtime/dispatcher/JobDispatcherITCase.java    | 210 +++++++++++++++++++++
 .../runtime/dispatcher/MiniDispatcherTest.java     |  33 ++++
 3 files changed, 258 insertions(+), 6 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
index bc5776e..76afe73 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
@@ -34,6 +35,7 @@ import org.apache.flink.util.FlinkException;
 
 import javax.annotation.Nullable;
 
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -106,8 +108,12 @@ public class MiniDispatcher extends Dispatcher {
                                         ? ApplicationStatus.FAILED
                                         : ApplicationStatus.SUCCEEDED;
 
-                        log.info("Shutting down cluster because someone retrieved the job result.");
-                        shutDownFuture.complete(status);
+                        if (!ApplicationStatus.UNKNOWN.equals(result.getApplicationStatus())) {
+                            log.info(
+                                    "Shutting down cluster because someone retrieved the job result"
+                                            + " and the status is globally terminal.");
+                            shutDownFuture.complete(status);
+                        }
                     });
         } else {
             log.info("Not shutting down cluster after someone retrieved the job result.");
@@ -128,16 +134,19 @@ public class MiniDispatcher extends Dispatcher {
                 executionGraphInfo.getArchivedExecutionGraph();
         final CleanupJobState cleanupHAState = super.jobReachedTerminalState(executionGraphInfo);
 
-        if (jobCancelled || executionMode == ClusterEntrypoint.ExecutionMode.DETACHED) {
+        JobStatus jobStatus =
+                Objects.requireNonNull(
+                        archivedExecutionGraph.getState(), "JobStatus should not be null here.");
+        if (jobStatus.isGloballyTerminalState()
+                && (jobCancelled || executionMode == ClusterEntrypoint.ExecutionMode.DETACHED)) {
             // shut down if job is cancelled or we don't have to wait for the execution result
             // retrieval
             log.info(
                     "Shutting down cluster with state {}, jobCancelled: {}, executionMode: {}",
-                    archivedExecutionGraph.getState(),
+                    jobStatus,
                     jobCancelled,
                     executionMode);
-            shutDownFuture.complete(
-                    ApplicationStatus.fromJobStatus(archivedExecutionGraph.getState()));
+            shutDownFuture.complete(ApplicationStatus.fromJobStatus(jobStatus));
         }
 
         return cleanupHAState;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
new file mode 100644
index 0000000..f565960
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
+import org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory;
+import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
+import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.rest.JobRestEndpointFactory;
+import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH;
+import static org.junit.Assert.assertNotNull;
+
+/** An integration test which recovers from checkpoint after regaining the leadership. */
+public class JobDispatcherITCase extends TestLogger {
+
+    private static final Duration TIMEOUT = Duration.ofMinutes(10);
+
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+    private Supplier<DispatcherResourceManagerComponentFactory>
+            createJobModeDispatcherResourceManagerComponentFactorySupplier(
+                    Configuration configuration) {
+        return () -> {
+            try {
+                return new DefaultDispatcherResourceManagerComponentFactory(
+                        new DefaultDispatcherRunnerFactory(
+                                JobDispatcherLeaderProcessFactoryFactory.create(
+                                        FileJobGraphRetriever.createFrom(configuration, null))),
+                        StandaloneResourceManagerFactory.getInstance(),
+                        JobRestEndpointFactory.INSTANCE);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        };
+    }
+
+    @Test
+    public void testRecoverFromCheckpointAfterLosingAndRegainingLeadership() throws Exception {
+        final Deadline deadline = Deadline.fromNow(TIMEOUT);
+        final Configuration configuration = new Configuration();
+        configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name());
+        final TestingMiniClusterConfiguration clusterConfiguration =
+                TestingMiniClusterConfiguration.newBuilder()
+                        .setConfiguration(configuration)
+                        .build();
+        final EmbeddedHaServicesWithLeadershipControl haServices =
+                new EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor());
+
+        Configuration newConfiguration = new Configuration(clusterConfiguration.getConfiguration());
+        long checkpointInterval = 500;
+        JobID jobID = generateJobGraph(newConfiguration, checkpointInterval);
+
+        final TestingMiniCluster.Builder clusterBuilder =
+                TestingMiniCluster.newBuilder(clusterConfiguration)
+                        .setHighAvailabilityServicesSupplier(() -> haServices)
+                        .setDispatcherResourceManagerComponentFactorySupplier(
+                                createJobModeDispatcherResourceManagerComponentFactorySupplier(
+                                        newConfiguration));
+
+        try (final MiniCluster cluster = clusterBuilder.build()) {
+            // start mini cluster and submit the job
+            cluster.start();
+
+            // wait until job is running
+            awaitJobStatus(cluster, jobID, JobStatus.RUNNING, deadline);
+            CommonTestUtils.waitForAllTaskRunning(cluster, jobID, false);
+            CommonTestUtils.waitUntilCondition(
+                    () -> queryCompletedCheckpoints(cluster, jobID) > 0L,
+                    Deadline.fromNow(Duration.ofSeconds(30)),
+                    checkpointInterval / 2);
+
+            final CompletableFuture<JobResult> firstJobResult = cluster.requestJobResult(jobID);
+            haServices.revokeDispatcherLeadership();
+            // make sure the leadership is revoked to avoid race conditions
+            Assertions.assertEquals(
+                    ApplicationStatus.UNKNOWN, firstJobResult.get().getApplicationStatus());
+
+            haServices.grantDispatcherLeadership();
+
+            // job is suspended, wait until it's running
+            awaitJobStatus(cluster, jobID, JobStatus.RUNNING, deadline);
+
+            assertNotNull(
+                    cluster.getArchivedExecutionGraph(jobID)
+                            .get()
+                            .getCheckpointStatsSnapshot()
+                            .getLatestRestoredCheckpoint());
+
+            cluster.cancelJob(jobID);
+
+            // the cluster should shut down automatically once the application completes
+            CommonTestUtils.waitUntilCondition(() -> !cluster.isRunning(), deadline);
+        }
+    }
+
+    private JobID generateJobGraph(Configuration configuration, long checkpointInterval)
+            throws Exception {
+        final JobVertex jobVertex = new JobVertex("jobVertex");
+        jobVertex.setInvokableClass(
+                AdaptiveSchedulerClusterITCase.CheckpointingNoOpInvokable.class);
+        jobVertex.setParallelism(1);
+
+        final CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration =
+                CheckpointCoordinatorConfiguration.builder()
+                        .setCheckpointInterval(checkpointInterval)
+                        .build();
+        final JobCheckpointingSettings checkpointingSettings =
+                new JobCheckpointingSettings(checkpointCoordinatorConfiguration, null);
+        JobGraph jobGraph =
+                JobGraphBuilder.newStreamingJobGraphBuilder()
+                        .addJobVertex(jobVertex)
+                        .setJobCheckpointingSettings(checkpointingSettings)
+                        .build();
+
+        final Path jobGraphPath =
+                TEMPORARY_FOLDER.newFile(JOB_GRAPH_FILE_PATH.defaultValue()).toPath();
+        try (ObjectOutputStream objectOutputStream =
+                new ObjectOutputStream(Files.newOutputStream(jobGraphPath, CREATE))) {
+            objectOutputStream.writeObject(jobGraph);
+        }
+        configuration.setString(JOB_GRAPH_FILE_PATH.key(), jobGraphPath.toString());
+        return jobGraph.getJobID();
+    }
+
+    private long queryCompletedCheckpoints(MiniCluster miniCluster, JobID jobID)
+            throws InterruptedException, ExecutionException {
+        return miniCluster
+                .getArchivedExecutionGraph(jobID)
+                .get()
+                .getCheckpointStatsSnapshot()
+                .getCounts()
+                .getNumberOfCompletedCheckpoints();
+    }
+
+    private static void awaitJobStatus(
+            MiniCluster cluster, JobID jobId, JobStatus status, Deadline deadline)
+            throws Exception {
+        CommonTestUtils.waitUntilCondition(
+                () -> {
+                    try {
+                        return cluster.getJobStatus(jobId).get() == status;
+                    } catch (ExecutionException e) {
+                        if (ExceptionUtils.findThrowable(e, FlinkJobNotFoundException.class)
+                                .isPresent()) {
+                            // job may not be yet submitted
+                            return false;
+                        }
+                        throw e;
+                    }
+                },
+                deadline);
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index af40e71..811f303 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -58,11 +58,13 @@ import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 /** Tests for the {@link MiniDispatcher}. */
 public class MiniDispatcherTest extends TestLogger {
@@ -202,6 +204,37 @@ public class MiniDispatcherTest extends TestLogger {
     }
 
     /**
+     * Tests that in detached mode, the {@link MiniDispatcher} will not complete the future that
+     * signals job termination if the JobStatus is not globally terminal state.
+     */
+    @Test
+    public void testNotTerminationWithoutGloballyTerminalState() throws Exception {
+        final MiniDispatcher miniDispatcher =
+                createMiniDispatcher(ClusterEntrypoint.ExecutionMode.DETACHED);
+        miniDispatcher.start();
+
+        try {
+            // wait until we have submitted the job
+            final TestingJobManagerRunner testingJobManagerRunner =
+                    testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
+
+            testingJobManagerRunner.completeResultFuture(
+                    new ExecutionGraphInfo(
+                            new ArchivedExecutionGraphBuilder()
+                                    .setJobID(jobGraph.getJobID())
+                                    .setState(JobStatus.SUSPENDED)
+                                    .build()));
+
+            miniDispatcher.getShutDownFuture().get(3, TimeUnit.SECONDS);
+            fail("The shutDownFuture should not be done.");
+        } catch (TimeoutException ignored) {
+
+        } finally {
+            RpcUtils.terminateRpcEndpoint(miniDispatcher, timeout);
+        }
+    }
+
+    /**
      * Tests that the {@link MiniDispatcher} only terminates in {@link
      * ClusterEntrypoint.ExecutionMode#NORMAL} after it has served the {@link
      * org.apache.flink.runtime.jobmaster.JobResult} once.