You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2022/02/08 14:21:44 UTC

[flink] 10/14: [FLINK-25817] Add LocalRecoveryITCase

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 990589c30a73a27412e2f8c7268c0c6448344993
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Dec 29 16:50:54 2021 +0100

    [FLINK-25817] Add LocalRecoveryITCase
    
    Adds an integration test for local recovery under process faults using the newly
    introduce working directory feature to persist state locally.
---
 .../flink/test/recovery/LocalRecoveryITCase.java   | 347 +++++++++++++++++++++
 1 file changed, 347 insertions(+)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java
new file mode 100644
index 0000000..108cf2f
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HeartbeatManagerOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.test.recovery.utils.TaskExecutorProcessEntryPoint;
+import org.apache.flink.test.util.TestProcessBuilder;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests local recovery by restarting Flink processes. */
+@ExtendWith(TestLoggerExtension.class)
+class LocalRecoveryITCase {
+
+    @TempDir private File tmpDirectory;
+
+    @Test
+    public void testRecoverLocallyFromProcessCrashWithWorkingDirectory() throws Exception {
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.ADDRESS, "localhost");
+        configuration.set(JobManagerOptions.PORT, 0);
+        configuration.set(RestOptions.BIND_PORT, "0");
+        configuration.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, 10000L);
+        configuration.set(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 1000L);
+        configuration.set(HeartbeatManagerOptions.HEARTBEAT_RPC_FAILURE_THRESHOLD, 1);
+        configuration.set(ClusterOptions.PROCESS_WORKING_DIR_BASE, tmpDirectory.getAbsolutePath());
+        configuration.set(CheckpointingOptions.LOCAL_RECOVERY, true);
+        configuration.set(TaskManagerOptions.SLOT_TIMEOUT, Duration.ofSeconds(30L));
+
+        final int parallelism = 3;
+
+        boolean success = false;
+        Collection<TaskManagerProcess> taskManagerProcesses = Collections.emptyList();
+        try (final StandaloneSessionClusterEntrypoint clusterEntrypoint =
+                new StandaloneSessionClusterEntrypoint(configuration)) {
+            clusterEntrypoint.startCluster();
+
+            final Configuration configurationTemplate = new Configuration(configuration);
+            configurationTemplate.set(JobManagerOptions.PORT, clusterEntrypoint.getRpcPort());
+            taskManagerProcesses = startTaskManagerProcesses(parallelism, configurationTemplate);
+
+            final JobClient jobClient = submitJob(parallelism, clusterEntrypoint);
+
+            final long waitingTimeInSeconds = 45L;
+            waitUntilCheckpointCompleted(
+                    configuration,
+                    clusterEntrypoint.getRestPort(),
+                    jobClient.getJobID(),
+                    Deadline.fromNow(Duration.ofSeconds(waitingTimeInSeconds)));
+
+            restartTaskManagerProcesses(taskManagerProcesses, parallelism - 1);
+
+            jobClient.getJobExecutionResult().get(waitingTimeInSeconds, TimeUnit.SECONDS);
+
+            success = true;
+        } finally {
+            if (!success) {
+                for (TaskManagerProcess taskManagerProcess : taskManagerProcesses) {
+                    printLogOutput(taskManagerProcess);
+                }
+            }
+
+            for (TaskManagerProcess taskManagerProcess : taskManagerProcesses) {
+                taskManagerProcess.terminate();
+            }
+        }
+    }
+
+    private static void printLogOutput(TaskManagerProcess taskManagerProcess) {
+        for (TestProcessBuilder.TestProcess testProcess : taskManagerProcess.getProcessHistory()) {
+            AbstractTaskManagerProcessFailureRecoveryTest.printProcessLog(
+                    taskManagerProcess.getName(), testProcess);
+        }
+    }
+
+    private static void restartTaskManagerProcesses(
+            Collection<TaskManagerProcess> taskManagerProcesses, int numberRestarts)
+            throws IOException, InterruptedException {
+        Preconditions.checkArgument(numberRestarts <= taskManagerProcesses.size());
+
+        final Iterator<TaskManagerProcess> iterator = taskManagerProcesses.iterator();
+
+        for (int i = 0; i < numberRestarts; i++) {
+            iterator.next().restartProcess(createTaskManagerProcessBuilder());
+        }
+    }
+
+    private static Collection<TaskManagerProcess> startTaskManagerProcesses(
+            int numberTaskManagers, Configuration configurationTemplate) throws IOException {
+        final Collection<TaskManagerProcess> result = new ArrayList<>();
+
+        for (int i = 0; i < numberTaskManagers; i++) {
+            final Configuration effectiveConfiguration = new Configuration(configurationTemplate);
+            effectiveConfiguration.set(
+                    TaskManagerOptions.TASK_MANAGER_RESOURCE_ID, "taskManager_" + i);
+
+            final TestProcessBuilder.TestProcess process =
+                    startTaskManagerProcess(effectiveConfiguration);
+
+            result.add(new TaskManagerProcess(effectiveConfiguration, process));
+        }
+
+        return result;
+    }
+
+    private static TestProcessBuilder.TestProcess startTaskManagerProcess(
+            Configuration effectiveConfiguration) throws IOException {
+        final TestProcessBuilder taskManagerProcessBuilder = createTaskManagerProcessBuilder();
+        taskManagerProcessBuilder.addConfigAsMainClassArgs(effectiveConfiguration);
+
+        final TestProcessBuilder.TestProcess process = taskManagerProcessBuilder.start();
+        return process;
+    }
+
+    @Nonnull
+    private static TestProcessBuilder createTaskManagerProcessBuilder() throws IOException {
+        return new TestProcessBuilder(TaskExecutorProcessEntryPoint.class.getName());
+    }
+
+    private static class TaskManagerProcess {
+        private final Configuration configuration;
+        private final List<TestProcessBuilder.TestProcess> processHistory;
+
+        private TaskManagerProcess(
+                Configuration configuration, TestProcessBuilder.TestProcess process) {
+            this.configuration = configuration;
+            this.processHistory = new ArrayList<>();
+            processHistory.add(process);
+        }
+
+        Iterable<TestProcessBuilder.TestProcess> getProcessHistory() {
+            return processHistory;
+        }
+
+        void restartProcess(TestProcessBuilder builder) throws IOException, InterruptedException {
+            final TestProcessBuilder.TestProcess runningProcess = getRunningProcess();
+            runningProcess.destroy();
+            runningProcess.getProcess().waitFor();
+
+            builder.addConfigAsMainClassArgs(configuration);
+            final TestProcessBuilder.TestProcess restartedProcess = builder.start();
+
+            processHistory.add(restartedProcess);
+        }
+
+        private TestProcessBuilder.TestProcess getRunningProcess() {
+            return processHistory.get(processHistory.size() - 1);
+        }
+
+        public String getName() {
+            return configuration.get(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID);
+        }
+
+        public void terminate() {
+            getRunningProcess().destroy();
+        }
+    }
+
+    private void waitUntilCheckpointCompleted(
+            Configuration configuration, int restPort, JobID jobId, Deadline deadline)
+            throws Exception {
+        final RestClient restClient = new RestClient(configuration, Executors.directExecutor());
+        final JobMessageParameters messageParameters = new JobMessageParameters();
+        messageParameters.jobPathParameter.resolve(jobId);
+
+        CommonTestUtils.waitUntilCondition(
+                () -> {
+                    final CheckpointingStatistics checkpointingStatistics =
+                            restClient
+                                    .sendRequest(
+                                            "localhost",
+                                            restPort,
+                                            CheckpointingStatisticsHeaders.getInstance(),
+                                            messageParameters,
+                                            EmptyRequestBody.getInstance())
+                                    .join();
+                    return checkpointingStatistics.getCounts().getNumberCompletedCheckpoints() > 0;
+                },
+                deadline);
+    }
+
+    private JobClient submitJob(
+            int parallelism, StandaloneSessionClusterEntrypoint clusterEntrypoint)
+            throws Exception {
+        final StreamExecutionEnvironment env =
+                StreamExecutionEnvironment.createRemoteEnvironment(
+                        "localhost", clusterEntrypoint.getRestPort(), new Configuration());
+        env.setParallelism(parallelism);
+
+        env.enableCheckpointing(100, CheckpointingMode.EXACTLY_ONCE);
+
+        env.addSource(new LocalRecoverySource()).keyBy(x -> x).addSink(new DiscardingSink<>());
+        final JobClient jobClient = env.executeAsync();
+        return jobClient;
+    }
+
+    private static class LocalRecoverySource extends RichParallelSourceFunction<Integer>
+            implements CheckpointedFunction {
+        private volatile boolean running = true;
+
+        private transient ListState<TaskNameAllocationID> previousAllocations;
+
+        @Override
+        public void run(SourceContext<Integer> ctx) throws Exception {
+            while (running) {
+                synchronized (ctx.getCheckpointLock()) {
+                    ctx.collect(1);
+                }
+
+                Thread.sleep(5L);
+            }
+        }
+
+        @Override
+        public void cancel() {
+            running = false;
+        }
+
+        @Override
+        public void snapshotState(FunctionSnapshotContext context) throws Exception {}
+
+        @Override
+        public void initializeState(FunctionInitializationContext context) throws Exception {
+            StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext) getRuntimeContext();
+            String allocationId = runtimeContext.getAllocationIDAsString();
+            // Pattern of the name: "Flat Map -> Sink: Unnamed (4/4)#0". Remove "#0" part:
+            String myName = runtimeContext.getTaskNameWithSubtasks().split("#")[0];
+
+            ListStateDescriptor<TaskNameAllocationID> previousAllocationsStateDescriptor =
+                    new ListStateDescriptor<>("sourceState", TaskNameAllocationID.class);
+            previousAllocations =
+                    context.getOperatorStateStore()
+                            .getUnionListState(previousAllocationsStateDescriptor);
+
+            if (context.isRestored()) {
+                final Iterable<TaskNameAllocationID> taskNameAllocationIds =
+                        previousAllocations.get();
+
+                Optional<TaskNameAllocationID> optionalMyTaskNameAllocationId = Optional.empty();
+
+                for (TaskNameAllocationID taskNameAllocationId : taskNameAllocationIds) {
+                    if (taskNameAllocationId.getName().equals(myName)) {
+                        optionalMyTaskNameAllocationId = Optional.of(taskNameAllocationId);
+                        break;
+                    }
+                }
+
+                final TaskNameAllocationID myTaskNameAllocationId =
+                        optionalMyTaskNameAllocationId.orElseThrow(
+                                () ->
+                                        new IllegalStateException(
+                                                "Could not find corresponding TaskNameAllocationID information."));
+
+                assertThat(myTaskNameAllocationId.getAllocationId())
+                        .withFailMessage(
+                                "The task was deployed to AllocationID(%s) but it should have been deployed to AllocationID(%s) for local recovery.",
+                                allocationId, myTaskNameAllocationId.getAllocationId())
+                        .isEqualTo(allocationId);
+                // terminate
+                running = false;
+            }
+
+            previousAllocations.clear();
+            previousAllocations.add(new TaskNameAllocationID(myName, allocationId));
+        }
+    }
+
+    private static class TaskNameAllocationID {
+        private final String name;
+        private final String allocationId;
+
+        private TaskNameAllocationID(String name, String allocationId) {
+            this.name = name;
+            this.allocationId = allocationId;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public String getAllocationId() {
+            return allocationId;
+        }
+    }
+}