You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2021/05/18 20:17:36 UTC

[flink] 02/02: [FLINK-22502][checkpointing] Don't tolerate checkpoint retrieval failures on recovery

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a153976c629ab73b5ee507807c91145a2536436d
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed May 5 18:59:14 2021 +0200

    [FLINK-22502][checkpointing] Don't tolerate checkpoint retrieval failures on recovery
    
    Ignoring such failures and running with an incomplete
    set of checkpoints can lead to consistency violation.
    
    Instead, transient failures should be mitigated by
    automatic job restart.
---
 .../DefaultCompletedCheckpointStore.java           |  65 +------
 .../DefaultCompletedCheckpointStoreTest.java       |  45 +----
 ...oKeeperCompletedCheckpointStoreMockitoTest.java |  26 +--
 .../test/checkpointing/CheckpointStoreITCase.java  | 207 +++++++++++++++++++++
 4 files changed, 223 insertions(+), 120 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
index 98549b6..97210f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
@@ -141,71 +141,20 @@ public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>>
             return;
         }
 
-        // Try and read the state handles from storage. We try until we either successfully read
-        // all of them or when we reach a stable state, i.e. when we successfully read the same set
-        // of checkpoints in two tries. We do it like this to protect against transient outages
-        // of the checkpoint store (for example a DFS): if the DFS comes online midway through
-        // reading a set of checkpoints we would run the risk of reading only a partial set
-        // of checkpoints while we could in fact read the other checkpoints as well if we retried.
-        // Waiting until a stable state protects against this while also being resilient against
-        // checkpoints being actually unreadable.
-        //
-        // These considerations are also important in the scope of incremental checkpoints, where
-        // we use ref-counting for shared state handles and might accidentally delete shared state
-        // of checkpoints that we don't read due to transient storage outages.
-        final List<CompletedCheckpoint> lastTryRetrievedCheckpoints =
-                new ArrayList<>(numberOfInitialCheckpoints);
         final List<CompletedCheckpoint> retrievedCheckpoints =
                 new ArrayList<>(numberOfInitialCheckpoints);
-        Exception retrieveException = null;
-        do {
-            LOG.info("Trying to fetch {} checkpoints from storage.", numberOfInitialCheckpoints);
-
-            lastTryRetrievedCheckpoints.clear();
-            lastTryRetrievedCheckpoints.addAll(retrievedCheckpoints);
-
-            retrievedCheckpoints.clear();
-
-            for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpointStateHandle :
-                    initialCheckpoints) {
-
-                CompletedCheckpoint completedCheckpoint;
-
-                try {
-                    completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle);
-                    if (completedCheckpoint != null) {
-                        retrievedCheckpoints.add(completedCheckpoint);
-                    }
-                } catch (Exception e) {
-                    LOG.warn(
-                            "Could not retrieve checkpoint, not adding to list of recovered checkpoints.",
-                            e);
-                    retrieveException = e;
-                }
-            }
+        LOG.info("Trying to fetch {} checkpoints from storage.", numberOfInitialCheckpoints);
 
-        } while (retrievedCheckpoints.size() != numberOfInitialCheckpoints
-                && !CompletedCheckpoint.checkpointsMatch(
-                        lastTryRetrievedCheckpoints, retrievedCheckpoints));
+        for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpointStateHandle :
+                initialCheckpoints) {
+            retrievedCheckpoints.add(
+                    checkNotNull(retrieveCompletedCheckpoint(checkpointStateHandle)));
+        }
 
         // Clear local handles in order to prevent duplicates on recovery. The local handles should
-        // reflect
-        // the state handle store contents.
+        // reflect the state handle store contents.
         completedCheckpoints.clear();
         completedCheckpoints.addAll(retrievedCheckpoints);
-
-        if (completedCheckpoints.isEmpty() && numberOfInitialCheckpoints > 0) {
-            throw new FlinkException(
-                    "Could not read any of the "
-                            + numberOfInitialCheckpoints
-                            + " checkpoints from storage.",
-                    retrieveException);
-        } else if (completedCheckpoints.size() != numberOfInitialCheckpoints) {
-            LOG.warn(
-                    "Could only fetch {} of {} checkpoints from storage.",
-                    completedCheckpoints.size(),
-                    numberOfInitialCheckpoints);
-        }
     }
 
     /**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
index d14e834..4d195cb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
@@ -152,7 +153,7 @@ public class DefaultCompletedCheckpointStoreTest extends TestLogger {
         assertThat(checkpointIds, contains(1L, 2L, 3L));
     }
 
-    /** We got an {@link IOException} when retrieving checkpoint 2. It should be skipped. */
+    /** We got an {@link IOException} when retrieving checkpoint 2. It should NOT be skipped. */
     @Test
     public void testCorruptDataInStateHandleStoreShouldBeSkipped() throws Exception {
         final long corruptCkpId = 2L;
@@ -169,45 +170,15 @@ public class DefaultCompletedCheckpointStoreTest extends TestLogger {
         final CompletedCheckpointStore completedCheckpointStore =
                 createCompletedCheckpointStore(stateHandleStore);
 
-        completedCheckpointStore.recover();
-
-        final List<CompletedCheckpoint> recoveredCompletedCheckpoint =
-                completedCheckpointStore.getAllCheckpoints();
-        assertThat(recoveredCompletedCheckpoint.size(), is(2));
-        final List<Long> checkpointIds =
-                recoveredCompletedCheckpoint.stream()
-                        .map(CompletedCheckpoint::getCheckpointID)
-                        .collect(Collectors.toList());
-        // Checkpoint 2 should be skipped.
-        assertThat(checkpointIds, contains(1L, 3L));
-    }
-
-    /**
-     * {@link DefaultCompletedCheckpointStore#recover()} should throw exception when all the
-     * checkpoints retrieved failed while the checkpoint pointers are not empty.
-     */
-    @Test
-    public void testRecoverFailedWhenRetrieveCheckpointAllFailed() {
-        final int ckpNum = 3;
-        checkpointStorageHelper.setRetrieveStateFunction(
-                (state) -> {
-                    throw new IOException(
-                            "Failed to retrieve checkpoint " + state.getCheckpointID());
-                });
-
-        final TestingStateHandleStore<CompletedCheckpoint> stateHandleStore =
-                builder.setGetAllSupplier(() -> createStateHandles(ckpNum)).build();
-        final CompletedCheckpointStore completedCheckpointStore =
-                createCompletedCheckpointStore(stateHandleStore);
-
         try {
             completedCheckpointStore.recover();
-            fail("We should get an exception when retrieving state failed.");
-        } catch (Exception ex) {
-            final String errMsg =
-                    "Could not read any of the " + ckpNum + " checkpoints from storage.";
-            assertThat(ex, FlinkMatchers.containsMessage(errMsg));
+        } catch (Exception e) {
+            if (ExceptionUtils.findThrowable(e, IOException.class).isPresent()) {
+                return;
+            }
+            throw e;
         }
+        fail();
     }
 
     @Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
index af8c091..b3ca09d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
@@ -39,7 +39,6 @@ import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -64,8 +63,7 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger {
             ZooKeeperCheckpointStoreUtil.INSTANCE;
 
     /**
-     * Tests that the completed checkpoint store can retrieve all checkpoints stored in ZooKeeper
-     * and ignores those which cannot be retrieved via their state handles.
+     * Tests that the completed checkpoint store can retrieve all checkpoints stored in ZooKeeper.
      *
      * <p>We have a timeout in case the ZooKeeper store get's into a deadlock/livelock situation.
      */
@@ -81,11 +79,6 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger {
         expectedCheckpointIds.add(1L);
         expectedCheckpointIds.add(2L);
 
-        final RetrievableStateHandle<CompletedCheckpoint> failingRetrievableStateHandle =
-                mock(RetrievableStateHandle.class);
-        when(failingRetrievableStateHandle.retrieveState())
-                .thenThrow(new IOException("Test exception"));
-
         final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle1 =
                 mock(RetrievableStateHandle.class);
         when(retrievableStateHandle1.retrieveState())
@@ -121,9 +114,7 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger {
                                         new TestCompletedCheckpointStorageLocation())));
 
         checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, "/foobar1"));
-        checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing1"));
         checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2, "/foobar2"));
-        checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing2"));
 
         final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS);
         final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock =
@@ -206,10 +197,6 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger {
         // check that we did not discard any of the state handles
         verify(retrievableStateHandle1, never()).discardState();
         verify(retrievableStateHandle2, never()).discardState();
-
-        // Make sure that we also didn't discard any of the broken handles. Only when checkpoints
-        // are subsumed should they be discarded.
-        verify(failingRetrievableStateHandle, never()).discardState();
     }
 
     /**
@@ -230,11 +217,6 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger {
         expectedCheckpointIds.add(1L);
         expectedCheckpointIds.add(2L);
 
-        final RetrievableStateHandle<CompletedCheckpoint> failingRetrievableStateHandle =
-                mock(RetrievableStateHandle.class);
-        when(failingRetrievableStateHandle.retrieveState())
-                .thenThrow(new IOException("Test exception"));
-
         final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle1 =
                 mock(RetrievableStateHandle.class);
         when(retrievableStateHandle1.retrieveState())
@@ -268,9 +250,7 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger {
                                         new TestCompletedCheckpointStorageLocation())));
 
         checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, "/foobar1"));
-        checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing1"));
         checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2, "/foobar2"));
-        checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing2"));
 
         final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS);
         final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock =
@@ -353,9 +333,5 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger {
         // check that we did not discard any of the state handles
         verify(retrievableStateHandle1, never()).discardState();
         verify(retrievableStateHandle2, never()).discardState();
-
-        // Make sure that we also didn't discard any of the broken handles. Only when checkpoints
-        // are subsumed should they be discarded.
-        verify(failingRetrievableStateHandle, never()).discardState();
     }
 }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointStoreITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointStoreITCase.java
new file mode 100644
index 0000000..1ad5a8b
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointStoreITCase.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
+import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.api.common.restartstrategy.RestartStrategies.fixedDelayRestart;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Test that failure on recovery leads to job restart if configured, so that transient recovery
+ * failures can are mitigated.
+ */
+public class CheckpointStoreITCase extends TestLogger {
+
+    @ClassRule
+    public static final MiniClusterWithClientResource CLUSTER =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setConfiguration(
+                                    new Configuration()
+                                            .set(
+                                                    HighAvailabilityOptions.HA_MODE,
+                                                    TestingHAFactory.class.getName()))
+                            .build());
+
+    @Before
+    public void init() {
+        FailingStore.reset();
+        FailingMapper.reset();
+    }
+
+    @Test
+    public void testRestartOnRecoveryFailure() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10);
+        env.setRestartStrategy(fixedDelayRestart(2 /* failure on processing + on recovery */, 0));
+        env.addSource(emitUntil(() -> FailingStore.recovered && FailingMapper.failedAndProcessed))
+                .map(new FailingMapper())
+                .addSink(new DiscardingSink<>());
+        env.execute();
+
+        checkState(FailingStore.recovered && FailingMapper.failedAndProcessed);
+    }
+
+    private static class FailingMapper implements MapFunction<Integer, Integer> {
+        private static volatile boolean failed = false;
+        private static volatile boolean failedAndProcessed = false;
+
+        public static void reset() {
+            failed = false;
+            failedAndProcessed = false;
+        }
+
+        @Override
+        public Integer map(Integer element) throws Exception {
+            if (!failed) {
+                failed = true;
+                throw new RuntimeException();
+            } else {
+                failedAndProcessed = true;
+                return element;
+            }
+        }
+    }
+
+    /** TestingHAFactory. */
+    public static class TestingHAFactory implements HighAvailabilityServicesFactory {
+
+        @Override
+        public HighAvailabilityServices createHAServices(
+                Configuration configuration, Executor executor) {
+            return new EmbeddedHaServices(Executors.directExecutor()) {
+
+                @Override
+                public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
+                    return new TestingCheckpointRecoveryFactory(
+                            new FailingStore(),
+                            new TestingCheckpointIDCounter(new CompletableFuture<>()));
+                }
+            };
+        }
+    }
+
+    private static class FailingStore implements CompletedCheckpointStore {
+        private static volatile boolean started = false;
+        private static volatile boolean failed = false;
+        private static volatile boolean recovered = false;
+
+        public static void reset() {
+            started = failed = recovered = false;
+        }
+
+        @Override
+        public void recover() throws Exception {
+            if (!started) {
+                started = true;
+            } else if (!failed) {
+                failed = true;
+                throw new RuntimeException();
+            } else if (!recovered) {
+                recovered = true;
+            }
+        }
+
+        @Override
+        public void addCheckpoint(
+                CompletedCheckpoint checkpoint,
+                CheckpointsCleaner checkpointsCleaner,
+                Runnable postCleanup) {}
+
+        @Override
+        public void shutdown(
+                JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner, Runnable postCleanup)
+                throws Exception {}
+
+        @Override
+        public List<CompletedCheckpoint> getAllCheckpoints() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public int getNumberOfRetainedCheckpoints() {
+            return 0;
+        }
+
+        @Override
+        public int getMaxNumberOfRetainedCheckpoints() {
+            return 1;
+        }
+
+        @Override
+        public boolean requiresExternalizedCheckpoints() {
+            return false;
+        }
+    }
+
+    private SourceFunction<Integer> emitUntil(SerializableSupplier<Boolean> until) {
+        return new SourceFunction<Integer>() {
+            private volatile boolean running = true;
+
+            @Override
+            public void run(SourceContext<Integer> ctx) {
+                while (running && !until.get()) {
+                    synchronized (ctx.getCheckpointLock()) {
+                        ctx.collect(0);
+                        try {
+                            Thread.sleep(100);
+                        } catch (InterruptedException e) {
+                            ExceptionUtils.rethrow(e);
+                        }
+                    }
+                }
+            }
+
+            @Override
+            public void cancel() {
+                running = false;
+            }
+        };
+    }
+}