You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2022/02/08 14:21:44 UTC
[flink] 10/14: [FLINK-25817] Add LocalRecoveryITCase
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 990589c30a73a27412e2f8c7268c0c6448344993
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Dec 29 16:50:54 2021 +0100
[FLINK-25817] Add LocalRecoveryITCase
Adds an integration test for local recovery under process faults using the newly
introduce working directory feature to persist state locally.
---
.../flink/test/recovery/LocalRecoveryITCase.java | 347 +++++++++++++++++++++
1 file changed, 347 insertions(+)
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java
new file mode 100644
index 0000000..108cf2f
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HeartbeatManagerOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.test.recovery.utils.TaskExecutorProcessEntryPoint;
+import org.apache.flink.test.util.TestProcessBuilder;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests local recovery by restarting Flink processes. */
+@ExtendWith(TestLoggerExtension.class)
+class LocalRecoveryITCase {
+
+ @TempDir private File tmpDirectory;
+
+ @Test
+ public void testRecoverLocallyFromProcessCrashWithWorkingDirectory() throws Exception {
+ final Configuration configuration = new Configuration();
+ configuration.set(JobManagerOptions.ADDRESS, "localhost");
+ configuration.set(JobManagerOptions.PORT, 0);
+ configuration.set(RestOptions.BIND_PORT, "0");
+ configuration.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 10000L);
+ configuration.set(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 1000L);
+ configuration.set(HeartbeatManagerOptions.HEARTBEAT_RPC_FAILURE_THRESHOLD, 1);
+ configuration.set(ClusterOptions.PROCESS_WORKING_DIR_BASE, tmpDirectory.getAbsolutePath());
+ configuration.set(CheckpointingOptions.LOCAL_RECOVERY, true);
+ configuration.set(TaskManagerOptions.SLOT_TIMEOUT, Duration.ofSeconds(30L));
+
+ final int parallelism = 3;
+
+ boolean success = false;
+ Collection<TaskManagerProcess> taskManagerProcesses = Collections.emptyList();
+ try (final StandaloneSessionClusterEntrypoint clusterEntrypoint =
+ new StandaloneSessionClusterEntrypoint(configuration)) {
+ clusterEntrypoint.startCluster();
+
+ final Configuration configurationTemplate = new Configuration(configuration);
+ configurationTemplate.set(JobManagerOptions.PORT, clusterEntrypoint.getRpcPort());
+ taskManagerProcesses = startTaskManagerProcesses(parallelism, configurationTemplate);
+
+ final JobClient jobClient = submitJob(parallelism, clusterEntrypoint);
+
+ final long waitingTimeInSeconds = 45L;
+ waitUntilCheckpointCompleted(
+ configuration,
+ clusterEntrypoint.getRestPort(),
+ jobClient.getJobID(),
+ Deadline.fromNow(Duration.ofSeconds(waitingTimeInSeconds)));
+
+ restartTaskManagerProcesses(taskManagerProcesses, parallelism - 1);
+
+ jobClient.getJobExecutionResult().get(waitingTimeInSeconds, TimeUnit.SECONDS);
+
+ success = true;
+ } finally {
+ if (!success) {
+ for (TaskManagerProcess taskManagerProcess : taskManagerProcesses) {
+ printLogOutput(taskManagerProcess);
+ }
+ }
+
+ for (TaskManagerProcess taskManagerProcess : taskManagerProcesses) {
+ taskManagerProcess.terminate();
+ }
+ }
+ }
+
+ private static void printLogOutput(TaskManagerProcess taskManagerProcess) {
+ for (TestProcessBuilder.TestProcess testProcess : taskManagerProcess.getProcessHistory()) {
+ AbstractTaskManagerProcessFailureRecoveryTest.printProcessLog(
+ taskManagerProcess.getName(), testProcess);
+ }
+ }
+
+ private static void restartTaskManagerProcesses(
+ Collection<TaskManagerProcess> taskManagerProcesses, int numberRestarts)
+ throws IOException, InterruptedException {
+ Preconditions.checkArgument(numberRestarts <= taskManagerProcesses.size());
+
+ final Iterator<TaskManagerProcess> iterator = taskManagerProcesses.iterator();
+
+ for (int i = 0; i < numberRestarts; i++) {
+ iterator.next().restartProcess(createTaskManagerProcessBuilder());
+ }
+ }
+
+ private static Collection<TaskManagerProcess> startTaskManagerProcesses(
+ int numberTaskManagers, Configuration configurationTemplate) throws IOException {
+ final Collection<TaskManagerProcess> result = new ArrayList<>();
+
+ for (int i = 0; i < numberTaskManagers; i++) {
+ final Configuration effectiveConfiguration = new Configuration(configurationTemplate);
+ effectiveConfiguration.set(
+ TaskManagerOptions.TASK_MANAGER_RESOURCE_ID, "taskManager_" + i);
+
+ final TestProcessBuilder.TestProcess process =
+ startTaskManagerProcess(effectiveConfiguration);
+
+ result.add(new TaskManagerProcess(effectiveConfiguration, process));
+ }
+
+ return result;
+ }
+
+ private static TestProcessBuilder.TestProcess startTaskManagerProcess(
+ Configuration effectiveConfiguration) throws IOException {
+ final TestProcessBuilder taskManagerProcessBuilder = createTaskManagerProcessBuilder();
+ taskManagerProcessBuilder.addConfigAsMainClassArgs(effectiveConfiguration);
+
+ final TestProcessBuilder.TestProcess process = taskManagerProcessBuilder.start();
+ return process;
+ }
+
+ @Nonnull
+ private static TestProcessBuilder createTaskManagerProcessBuilder() throws IOException {
+ return new TestProcessBuilder(TaskExecutorProcessEntryPoint.class.getName());
+ }
+
+ private static class TaskManagerProcess {
+ private final Configuration configuration;
+ private final List<TestProcessBuilder.TestProcess> processHistory;
+
+ private TaskManagerProcess(
+ Configuration configuration, TestProcessBuilder.TestProcess process) {
+ this.configuration = configuration;
+ this.processHistory = new ArrayList<>();
+ processHistory.add(process);
+ }
+
+ Iterable<TestProcessBuilder.TestProcess> getProcessHistory() {
+ return processHistory;
+ }
+
+ void restartProcess(TestProcessBuilder builder) throws IOException, InterruptedException {
+ final TestProcessBuilder.TestProcess runningProcess = getRunningProcess();
+ runningProcess.destroy();
+ runningProcess.getProcess().waitFor();
+
+ builder.addConfigAsMainClassArgs(configuration);
+ final TestProcessBuilder.TestProcess restartedProcess = builder.start();
+
+ processHistory.add(restartedProcess);
+ }
+
+ private TestProcessBuilder.TestProcess getRunningProcess() {
+ return processHistory.get(processHistory.size() - 1);
+ }
+
+ public String getName() {
+ return configuration.get(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID);
+ }
+
+ public void terminate() {
+ getRunningProcess().destroy();
+ }
+ }
+
+ private void waitUntilCheckpointCompleted(
+ Configuration configuration, int restPort, JobID jobId, Deadline deadline)
+ throws Exception {
+ final RestClient restClient = new RestClient(configuration, Executors.directExecutor());
+ final JobMessageParameters messageParameters = new JobMessageParameters();
+ messageParameters.jobPathParameter.resolve(jobId);
+
+ CommonTestUtils.waitUntilCondition(
+ () -> {
+ final CheckpointingStatistics checkpointingStatistics =
+ restClient
+ .sendRequest(
+ "localhost",
+ restPort,
+ CheckpointingStatisticsHeaders.getInstance(),
+ messageParameters,
+ EmptyRequestBody.getInstance())
+ .join();
+ return checkpointingStatistics.getCounts().getNumberCompletedCheckpoints() > 0;
+ },
+ deadline);
+ }
+
+ private JobClient submitJob(
+ int parallelism, StandaloneSessionClusterEntrypoint clusterEntrypoint)
+ throws Exception {
+ final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.createRemoteEnvironment(
+ "localhost", clusterEntrypoint.getRestPort(), new Configuration());
+ env.setParallelism(parallelism);
+
+ env.enableCheckpointing(100, CheckpointingMode.EXACTLY_ONCE);
+
+ env.addSource(new LocalRecoverySource()).keyBy(x -> x).addSink(new DiscardingSink<>());
+ final JobClient jobClient = env.executeAsync();
+ return jobClient;
+ }
+
+ private static class LocalRecoverySource extends RichParallelSourceFunction<Integer>
+ implements CheckpointedFunction {
+ private volatile boolean running = true;
+
+ private transient ListState<TaskNameAllocationID> previousAllocations;
+
+ @Override
+ public void run(SourceContext<Integer> ctx) throws Exception {
+ while (running) {
+ synchronized (ctx.getCheckpointLock()) {
+ ctx.collect(1);
+ }
+
+ Thread.sleep(5L);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {}
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext) getRuntimeContext();
+ String allocationId = runtimeContext.getAllocationIDAsString();
+ // Pattern of the name: "Flat Map -> Sink: Unnamed (4/4)#0". Remove "#0" part:
+ String myName = runtimeContext.getTaskNameWithSubtasks().split("#")[0];
+
+ ListStateDescriptor<TaskNameAllocationID> previousAllocationsStateDescriptor =
+ new ListStateDescriptor<>("sourceState", TaskNameAllocationID.class);
+ previousAllocations =
+ context.getOperatorStateStore()
+ .getUnionListState(previousAllocationsStateDescriptor);
+
+ if (context.isRestored()) {
+ final Iterable<TaskNameAllocationID> taskNameAllocationIds =
+ previousAllocations.get();
+
+ Optional<TaskNameAllocationID> optionalMyTaskNameAllocationId = Optional.empty();
+
+ for (TaskNameAllocationID taskNameAllocationId : taskNameAllocationIds) {
+ if (taskNameAllocationId.getName().equals(myName)) {
+ optionalMyTaskNameAllocationId = Optional.of(taskNameAllocationId);
+ break;
+ }
+ }
+
+ final TaskNameAllocationID myTaskNameAllocationId =
+ optionalMyTaskNameAllocationId.orElseThrow(
+ () ->
+ new IllegalStateException(
+ "Could not find corresponding TaskNameAllocationID information."));
+
+ assertThat(myTaskNameAllocationId.getAllocationId())
+ .withFailMessage(
+ "The task was deployed to AllocationID(%s) but it should have been deployed to AllocationID(%s) for local recovery.",
+ allocationId, myTaskNameAllocationId.getAllocationId())
+ .isEqualTo(allocationId);
+ // terminate
+ running = false;
+ }
+
+ previousAllocations.clear();
+ previousAllocations.add(new TaskNameAllocationID(myName, allocationId));
+ }
+ }
+
+ private static class TaskNameAllocationID {
+ private final String name;
+ private final String allocationId;
+
+ private TaskNameAllocationID(String name, String allocationId) {
+ this.name = name;
+ this.allocationId = allocationId;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getAllocationId() {
+ return allocationId;
+ }
+ }
+}