You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2022/01/29 15:40:41 UTC
[flink] 01/02: [FLINK-25486][Runtime/Coordination] Fix the bug that flink will lost state when zookeeper leader changes
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8ba13f37afb9164f3bb17de78c4b0d85b1633638
Author: liujiangang <li...@kuaishou.com>
AuthorDate: Fri Jan 7 18:28:15 2022 +0800
[FLINK-25486][Runtime/Coordination] Fix the bug that flink will lost state when zookeeper leader changes
This closes #18296.
---
.../flink/runtime/dispatcher/MiniDispatcher.java | 21 ++-
.../runtime/dispatcher/JobDispatcherITCase.java | 210 +++++++++++++++++++++
.../runtime/dispatcher/MiniDispatcherTest.java | 33 ++++
3 files changed, 258 insertions(+), 6 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
index bc5776e..76afe73 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.dispatcher;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
@@ -34,6 +35,7 @@ import org.apache.flink.util.FlinkException;
import javax.annotation.Nullable;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -106,8 +108,12 @@ public class MiniDispatcher extends Dispatcher {
? ApplicationStatus.FAILED
: ApplicationStatus.SUCCEEDED;
- log.info("Shutting down cluster because someone retrieved the job result.");
- shutDownFuture.complete(status);
+ if (!ApplicationStatus.UNKNOWN.equals(result.getApplicationStatus())) {
+ log.info(
+ "Shutting down cluster because someone retrieved the job result"
+ + " and the status is globally terminal.");
+ shutDownFuture.complete(status);
+ }
});
} else {
log.info("Not shutting down cluster after someone retrieved the job result.");
@@ -128,16 +134,19 @@ public class MiniDispatcher extends Dispatcher {
executionGraphInfo.getArchivedExecutionGraph();
final CleanupJobState cleanupHAState = super.jobReachedTerminalState(executionGraphInfo);
- if (jobCancelled || executionMode == ClusterEntrypoint.ExecutionMode.DETACHED) {
+ JobStatus jobStatus =
+ Objects.requireNonNull(
+ archivedExecutionGraph.getState(), "JobStatus should not be null here.");
+ if (jobStatus.isGloballyTerminalState()
+ && (jobCancelled || executionMode == ClusterEntrypoint.ExecutionMode.DETACHED)) {
// shut down if job is cancelled or we don't have to wait for the execution result
// retrieval
log.info(
"Shutting down cluster with state {}, jobCancelled: {}, executionMode: {}",
- archivedExecutionGraph.getState(),
+ jobStatus,
jobCancelled,
executionMode);
- shutDownFuture.complete(
- ApplicationStatus.fromJobStatus(archivedExecutionGraph.getState()));
+ shutDownFuture.complete(ApplicationStatus.fromJobStatus(jobStatus));
}
return cleanupHAState;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
new file mode 100644
index 0000000..f565960
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
+import org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory;
+import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
+import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.rest.JobRestEndpointFactory;
+import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH;
+import static org.junit.Assert.assertNotNull;
+
+/** An integration test which recovers from checkpoint after regaining the leadership. */
+public class JobDispatcherITCase extends TestLogger {
+
+ private static final Duration TIMEOUT = Duration.ofMinutes(10);
+
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+ private Supplier<DispatcherResourceManagerComponentFactory>
+ createJobModeDispatcherResourceManagerComponentFactorySupplier(
+ Configuration configuration) {
+ return () -> {
+ try {
+ return new DefaultDispatcherResourceManagerComponentFactory(
+ new DefaultDispatcherRunnerFactory(
+ JobDispatcherLeaderProcessFactoryFactory.create(
+ FileJobGraphRetriever.createFrom(configuration, null))),
+ StandaloneResourceManagerFactory.getInstance(),
+ JobRestEndpointFactory.INSTANCE);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ };
+ }
+
+ @Test
+ public void testRecoverFromCheckpointAfterLosingAndRegainingLeadership() throws Exception {
+ final Deadline deadline = Deadline.fromNow(TIMEOUT);
+ final Configuration configuration = new Configuration();
+ configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name());
+ final TestingMiniClusterConfiguration clusterConfiguration =
+ TestingMiniClusterConfiguration.newBuilder()
+ .setConfiguration(configuration)
+ .build();
+ final EmbeddedHaServicesWithLeadershipControl haServices =
+ new EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor());
+
+ Configuration newConfiguration = new Configuration(clusterConfiguration.getConfiguration());
+ long checkpointInterval = 500;
+ JobID jobID = generateJobGraph(newConfiguration, checkpointInterval);
+
+ final TestingMiniCluster.Builder clusterBuilder =
+ TestingMiniCluster.newBuilder(clusterConfiguration)
+ .setHighAvailabilityServicesSupplier(() -> haServices)
+ .setDispatcherResourceManagerComponentFactorySupplier(
+ createJobModeDispatcherResourceManagerComponentFactorySupplier(
+ newConfiguration));
+
+ try (final MiniCluster cluster = clusterBuilder.build()) {
+ // start mini cluster and submit the job
+ cluster.start();
+
+ // wait until job is running
+ awaitJobStatus(cluster, jobID, JobStatus.RUNNING, deadline);
+ CommonTestUtils.waitForAllTaskRunning(cluster, jobID, false);
+ CommonTestUtils.waitUntilCondition(
+ () -> queryCompletedCheckpoints(cluster, jobID) > 0L,
+ Deadline.fromNow(Duration.ofSeconds(30)),
+ checkpointInterval / 2);
+
+ final CompletableFuture<JobResult> firstJobResult = cluster.requestJobResult(jobID);
+ haServices.revokeDispatcherLeadership();
+ // make sure the leadership is revoked to avoid race conditions
+ Assertions.assertEquals(
+ ApplicationStatus.UNKNOWN, firstJobResult.get().getApplicationStatus());
+
+ haServices.grantDispatcherLeadership();
+
+ // job is suspended, wait until it's running
+ awaitJobStatus(cluster, jobID, JobStatus.RUNNING, deadline);
+
+ assertNotNull(
+ cluster.getArchivedExecutionGraph(jobID)
+ .get()
+ .getCheckpointStatsSnapshot()
+ .getLatestRestoredCheckpoint());
+
+ cluster.cancelJob(jobID);
+
+ // the cluster should shut down automatically once the application completes
+ CommonTestUtils.waitUntilCondition(() -> !cluster.isRunning(), deadline);
+ }
+ }
+
+ private JobID generateJobGraph(Configuration configuration, long checkpointInterval)
+ throws Exception {
+ final JobVertex jobVertex = new JobVertex("jobVertex");
+ jobVertex.setInvokableClass(
+ AdaptiveSchedulerClusterITCase.CheckpointingNoOpInvokable.class);
+ jobVertex.setParallelism(1);
+
+ final CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration =
+ CheckpointCoordinatorConfiguration.builder()
+ .setCheckpointInterval(checkpointInterval)
+ .build();
+ final JobCheckpointingSettings checkpointingSettings =
+ new JobCheckpointingSettings(checkpointCoordinatorConfiguration, null);
+ JobGraph jobGraph =
+ JobGraphBuilder.newStreamingJobGraphBuilder()
+ .addJobVertex(jobVertex)
+ .setJobCheckpointingSettings(checkpointingSettings)
+ .build();
+
+ final Path jobGraphPath =
+ TEMPORARY_FOLDER.newFile(JOB_GRAPH_FILE_PATH.defaultValue()).toPath();
+ try (ObjectOutputStream objectOutputStream =
+ new ObjectOutputStream(Files.newOutputStream(jobGraphPath, CREATE))) {
+ objectOutputStream.writeObject(jobGraph);
+ }
+ configuration.setString(JOB_GRAPH_FILE_PATH.key(), jobGraphPath.toString());
+ return jobGraph.getJobID();
+ }
+
+ private long queryCompletedCheckpoints(MiniCluster miniCluster, JobID jobID)
+ throws InterruptedException, ExecutionException {
+ return miniCluster
+ .getArchivedExecutionGraph(jobID)
+ .get()
+ .getCheckpointStatsSnapshot()
+ .getCounts()
+ .getNumberOfCompletedCheckpoints();
+ }
+
+ private static void awaitJobStatus(
+ MiniCluster cluster, JobID jobId, JobStatus status, Deadline deadline)
+ throws Exception {
+ CommonTestUtils.waitUntilCondition(
+ () -> {
+ try {
+ return cluster.getJobStatus(jobId).get() == status;
+ } catch (ExecutionException e) {
+ if (ExceptionUtils.findThrowable(e, FlinkJobNotFoundException.class)
+ .isPresent()) {
+ // job may not be yet submitted
+ return false;
+ }
+ throw e;
+ }
+ },
+ deadline);
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index af40e71..811f303 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -58,11 +58,13 @@ import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
/** Tests for the {@link MiniDispatcher}. */
public class MiniDispatcherTest extends TestLogger {
@@ -202,6 +204,37 @@ public class MiniDispatcherTest extends TestLogger {
}
/**
+ * Tests that in detached mode, the {@link MiniDispatcher} will not complete the future that
+ * signals job termination if the JobStatus is not globally terminal state.
+ */
+ @Test
+ public void testNotTerminationWithoutGloballyTerminalState() throws Exception {
+ final MiniDispatcher miniDispatcher =
+ createMiniDispatcher(ClusterEntrypoint.ExecutionMode.DETACHED);
+ miniDispatcher.start();
+
+ try {
+ // wait until we have submitted the job
+ final TestingJobManagerRunner testingJobManagerRunner =
+ testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
+
+ testingJobManagerRunner.completeResultFuture(
+ new ExecutionGraphInfo(
+ new ArchivedExecutionGraphBuilder()
+ .setJobID(jobGraph.getJobID())
+ .setState(JobStatus.SUSPENDED)
+ .build()));
+
+ miniDispatcher.getShutDownFuture().get(3, TimeUnit.SECONDS);
+ fail("The shutDownFuture should not be done.");
+ } catch (TimeoutException ignored) {
+
+ } finally {
+ RpcUtils.terminateRpcEndpoint(miniDispatcher, timeout);
+ }
+ }
+
+ /**
* Tests that the {@link MiniDispatcher} only terminates in {@link
* ClusterEntrypoint.ExecutionMode#NORMAL} after it has served the {@link
* org.apache.flink.runtime.jobmaster.JobResult} once.