You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2021/05/18 20:17:34 UTC

[flink] branch release-1.12 updated (6b53f8b -> a153976)

This is an automated email from the ASF dual-hosted git repository.

roman pushed a change to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 6b53f8b  [FLINK-22494][runtime] Adds PossibleInconsistentStateException handling to CheckpointCoordinator
     new ee34945  [hotfix][tests] Move test checkpoint component classes to upper level
     new a153976  [FLINK-22502][checkpointing] Don't tolerate checkpoint retrieval failures on recovery

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../DefaultCompletedCheckpointStore.java           |  65 +------
 .../DefaultCompletedCheckpointStoreTest.java       |  45 +----
 .../ExecutionGraphCheckpointCoordinatorTest.java   |  86 ---------
 .../checkpoint/TestingCheckpointIDCounter.java     |  55 ++++++
 .../TestingCheckpointRecoveryFactory.java          |  26 +--
 .../TestingCompletedCheckpointStore.java           |  77 ++++++++
 ...oKeeperCompletedCheckpointStoreMockitoTest.java |  26 +--
 .../test/checkpointing/CheckpointStoreITCase.java  | 207 +++++++++++++++++++++
 8 files changed, 370 insertions(+), 217 deletions(-)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointIDCounter.java
 copy flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockFunctionSnapshotContext.java => flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java (51%)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCompletedCheckpointStore.java
 create mode 100644 flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointStoreITCase.java

[flink] 01/02: [hotfix][tests] Move test checkpoint component classes to upper level

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ee349451fc592991cccf3a4a9391fc38cc433739
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Feb 17 10:46:35 2021 +0100

    [hotfix][tests] Move test checkpoint component classes to upper level
---
 .../ExecutionGraphCheckpointCoordinatorTest.java   | 86 ----------------------
 .../checkpoint/TestingCheckpointIDCounter.java     | 55 ++++++++++++++
 .../TestingCheckpointRecoveryFactory.java          | 44 +++++++++++
 .../TestingCompletedCheckpointStore.java           | 77 +++++++++++++++++++
 4 files changed, 176 insertions(+), 86 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 6f0ec4b..5e08c4f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -38,7 +38,6 @@ import org.hamcrest.Matchers;
 import org.junit.Test;
 
 import java.util.Collections;
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
 import static org.hamcrest.Matchers.is;
@@ -168,89 +167,4 @@ public class ExecutionGraphCheckpointCoordinatorTest extends TestLogger {
 
         return executionGraph;
     }
-
-    private static final class TestingCheckpointIDCounter implements CheckpointIDCounter {
-
-        private final CompletableFuture<JobStatus> shutdownStatus;
-
-        private TestingCheckpointIDCounter(CompletableFuture<JobStatus> shutdownStatus) {
-            this.shutdownStatus = shutdownStatus;
-        }
-
-        @Override
-        public void start() {}
-
-        @Override
-        public void shutdown(JobStatus jobStatus) {
-            shutdownStatus.complete(jobStatus);
-        }
-
-        @Override
-        public long getAndIncrement() {
-            throw new UnsupportedOperationException("Not implemented.");
-        }
-
-        @Override
-        public long get() {
-            throw new UnsupportedOperationException("Not implemented.");
-        }
-
-        @Override
-        public void setCount(long newId) {
-            throw new UnsupportedOperationException("Not implemented.");
-        }
-    }
-
-    private static final class TestingCompletedCheckpointStore implements CompletedCheckpointStore {
-
-        private final CompletableFuture<JobStatus> shutdownStatus;
-
-        private TestingCompletedCheckpointStore(CompletableFuture<JobStatus> shutdownStatus) {
-            this.shutdownStatus = shutdownStatus;
-        }
-
-        @Override
-        public void recover() {
-            throw new UnsupportedOperationException("Not implemented.");
-        }
-
-        @Override
-        public void addCheckpoint(
-                CompletedCheckpoint checkpoint,
-                CheckpointsCleaner checkpointsCleaner,
-                Runnable postCleanup) {
-            throw new UnsupportedOperationException("Not implemented.");
-        }
-
-        @Override
-        public CompletedCheckpoint getLatestCheckpoint(boolean isPreferCheckpointForRecovery) {
-            throw new UnsupportedOperationException("Not implemented.");
-        }
-
-        @Override
-        public void shutdown(
-                JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner, Runnable postCleanup) {
-            shutdownStatus.complete(jobStatus);
-        }
-
-        @Override
-        public List<CompletedCheckpoint> getAllCheckpoints() {
-            throw new UnsupportedOperationException("Not implemented.");
-        }
-
-        @Override
-        public int getNumberOfRetainedCheckpoints() {
-            throw new UnsupportedOperationException("Not implemented.");
-        }
-
-        @Override
-        public int getMaxNumberOfRetainedCheckpoints() {
-            throw new UnsupportedOperationException("Not implemented.");
-        }
-
-        @Override
-        public boolean requiresExternalizedCheckpoints() {
-            throw new UnsupportedOperationException("Not implemented.");
-        }
-    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointIDCounter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointIDCounter.java
new file mode 100644
index 0000000..79df955
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointIDCounter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobStatus;
+
+import java.util.concurrent.CompletableFuture;
+
+/** Test {@link CheckpointIDCounter} implementation for testing the shutdown behavior. */
+public final class TestingCheckpointIDCounter implements CheckpointIDCounter {
+
+    private final CompletableFuture<JobStatus> shutdownStatus;
+
+    public TestingCheckpointIDCounter(CompletableFuture<JobStatus> shutdownStatus) {
+        this.shutdownStatus = shutdownStatus;
+    }
+
+    @Override
+    public void start() {}
+
+    @Override
+    public void shutdown(JobStatus jobStatus) {
+        shutdownStatus.complete(jobStatus);
+    }
+
+    @Override
+    public long getAndIncrement() {
+        throw new UnsupportedOperationException("Not implemented.");
+    }
+
+    @Override
+    public long get() {
+        throw new UnsupportedOperationException("Not implemented.");
+    }
+
+    @Override
+    public void setCount(long newId) {
+        throw new UnsupportedOperationException("Not implemented.");
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
new file mode 100644
index 0000000..cda32d6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+
+/** A {@link CheckpointRecoveryFactory} that pre-defined checkpointing components. */
+public class TestingCheckpointRecoveryFactory implements CheckpointRecoveryFactory {
+
+    private final CompletedCheckpointStore store;
+    private final CheckpointIDCounter counter;
+
+    public TestingCheckpointRecoveryFactory(
+            CompletedCheckpointStore store, CheckpointIDCounter counter) {
+        this.store = store;
+        this.counter = counter;
+    }
+
+    @Override
+    public CompletedCheckpointStore createCheckpointStore(
+            JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) {
+        return store;
+    }
+
+    @Override
+    public CheckpointIDCounter createCheckpointIDCounter(JobID jobId) {
+        return counter;
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCompletedCheckpointStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCompletedCheckpointStore.java
new file mode 100644
index 0000000..4182330
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCompletedCheckpointStore.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobStatus;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/** Test {@link CompletedCheckpointStore} implementation for testing the shutdown behavior. */
+public final class TestingCompletedCheckpointStore implements CompletedCheckpointStore {
+
+    private final CompletableFuture<JobStatus> shutdownStatus;
+
+    public TestingCompletedCheckpointStore(CompletableFuture<JobStatus> shutdownStatus) {
+        this.shutdownStatus = shutdownStatus;
+    }
+
+    @Override
+    public void recover() {}
+
+    @Override
+    public void addCheckpoint(
+            CompletedCheckpoint checkpoint,
+            CheckpointsCleaner checkpointsCleaner,
+            Runnable postCleanup) {
+        throw new UnsupportedOperationException("Not implemented.");
+    }
+
+    @Override
+    public CompletedCheckpoint getLatestCheckpoint(boolean isPreferCheckpointForRecovery) {
+        return null;
+    }
+
+    @Override
+    public void shutdown(
+            JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner, Runnable postCleanup)
+            throws Exception {
+        shutdownStatus.complete(jobStatus);
+    }
+
+    @Override
+    public List<CompletedCheckpoint> getAllCheckpoints() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public int getNumberOfRetainedCheckpoints() {
+        throw new UnsupportedOperationException("Not implemented.");
+    }
+
+    @Override
+    public int getMaxNumberOfRetainedCheckpoints() {
+        throw new UnsupportedOperationException("Not implemented.");
+    }
+
+    @Override
+    public boolean requiresExternalizedCheckpoints() {
+        throw new UnsupportedOperationException("Not implemented.");
+    }
+}

[flink] 02/02: [FLINK-22502][checkpointing] Don't tolerate checkpoint retrieval failures on recovery

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a153976c629ab73b5ee507807c91145a2536436d
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed May 5 18:59:14 2021 +0200

    [FLINK-22502][checkpointing] Don't tolerate checkpoint retrieval failures on recovery
    
    Ignoring such failures and running with an incomplete
    set of checkpoints can lead to consistency violation.
    
    Instead, transient failures should be mitigated by
    automatic job restart.
---
 .../DefaultCompletedCheckpointStore.java           |  65 +------
 .../DefaultCompletedCheckpointStoreTest.java       |  45 +----
 ...oKeeperCompletedCheckpointStoreMockitoTest.java |  26 +--
 .../test/checkpointing/CheckpointStoreITCase.java  | 207 +++++++++++++++++++++
 4 files changed, 223 insertions(+), 120 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
index 98549b6..97210f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
@@ -141,71 +141,20 @@ public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>>
             return;
         }
 
-        // Try and read the state handles from storage. We try until we either successfully read
-        // all of them or when we reach a stable state, i.e. when we successfully read the same set
-        // of checkpoints in two tries. We do it like this to protect against transient outages
-        // of the checkpoint store (for example a DFS): if the DFS comes online midway through
-        // reading a set of checkpoints we would run the risk of reading only a partial set
-        // of checkpoints while we could in fact read the other checkpoints as well if we retried.
-        // Waiting until a stable state protects against this while also being resilient against
-        // checkpoints being actually unreadable.
-        //
-        // These considerations are also important in the scope of incremental checkpoints, where
-        // we use ref-counting for shared state handles and might accidentally delete shared state
-        // of checkpoints that we don't read due to transient storage outages.
-        final List<CompletedCheckpoint> lastTryRetrievedCheckpoints =
-                new ArrayList<>(numberOfInitialCheckpoints);
         final List<CompletedCheckpoint> retrievedCheckpoints =
                 new ArrayList<>(numberOfInitialCheckpoints);
-        Exception retrieveException = null;
-        do {
-            LOG.info("Trying to fetch {} checkpoints from storage.", numberOfInitialCheckpoints);
-
-            lastTryRetrievedCheckpoints.clear();
-            lastTryRetrievedCheckpoints.addAll(retrievedCheckpoints);
-
-            retrievedCheckpoints.clear();
-
-            for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpointStateHandle :
-                    initialCheckpoints) {
-
-                CompletedCheckpoint completedCheckpoint;
-
-                try {
-                    completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle);
-                    if (completedCheckpoint != null) {
-                        retrievedCheckpoints.add(completedCheckpoint);
-                    }
-                } catch (Exception e) {
-                    LOG.warn(
-                            "Could not retrieve checkpoint, not adding to list of recovered checkpoints.",
-                            e);
-                    retrieveException = e;
-                }
-            }
+        LOG.info("Trying to fetch {} checkpoints from storage.", numberOfInitialCheckpoints);
 
-        } while (retrievedCheckpoints.size() != numberOfInitialCheckpoints
-                && !CompletedCheckpoint.checkpointsMatch(
-                        lastTryRetrievedCheckpoints, retrievedCheckpoints));
+        for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpointStateHandle :
+                initialCheckpoints) {
+            retrievedCheckpoints.add(
+                    checkNotNull(retrieveCompletedCheckpoint(checkpointStateHandle)));
+        }
 
         // Clear local handles in order to prevent duplicates on recovery. The local handles should
-        // reflect
-        // the state handle store contents.
+        // reflect the state handle store contents.
         completedCheckpoints.clear();
         completedCheckpoints.addAll(retrievedCheckpoints);
-
-        if (completedCheckpoints.isEmpty() && numberOfInitialCheckpoints > 0) {
-            throw new FlinkException(
-                    "Could not read any of the "
-                            + numberOfInitialCheckpoints
-                            + " checkpoints from storage.",
-                    retrieveException);
-        } else if (completedCheckpoints.size() != numberOfInitialCheckpoints) {
-            LOG.warn(
-                    "Could only fetch {} of {} checkpoints from storage.",
-                    completedCheckpoints.size(),
-                    numberOfInitialCheckpoints);
-        }
     }
 
     /**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
index d14e834..4d195cb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
@@ -152,7 +153,7 @@ public class DefaultCompletedCheckpointStoreTest extends TestLogger {
         assertThat(checkpointIds, contains(1L, 2L, 3L));
     }
 
-    /** We got an {@link IOException} when retrieving checkpoint 2. It should be skipped. */
+    /** We got an {@link IOException} when retrieving checkpoint 2. It should NOT be skipped. */
     @Test
     public void testCorruptDataInStateHandleStoreShouldBeSkipped() throws Exception {
         final long corruptCkpId = 2L;
@@ -169,45 +170,15 @@ public class DefaultCompletedCheckpointStoreTest extends TestLogger {
         final CompletedCheckpointStore completedCheckpointStore =
                 createCompletedCheckpointStore(stateHandleStore);
 
-        completedCheckpointStore.recover();
-
-        final List<CompletedCheckpoint> recoveredCompletedCheckpoint =
-                completedCheckpointStore.getAllCheckpoints();
-        assertThat(recoveredCompletedCheckpoint.size(), is(2));
-        final List<Long> checkpointIds =
-                recoveredCompletedCheckpoint.stream()
-                        .map(CompletedCheckpoint::getCheckpointID)
-                        .collect(Collectors.toList());
-        // Checkpoint 2 should be skipped.
-        assertThat(checkpointIds, contains(1L, 3L));
-    }
-
-    /**
-     * {@link DefaultCompletedCheckpointStore#recover()} should throw exception when all the
-     * checkpoints retrieved failed while the checkpoint pointers are not empty.
-     */
-    @Test
-    public void testRecoverFailedWhenRetrieveCheckpointAllFailed() {
-        final int ckpNum = 3;
-        checkpointStorageHelper.setRetrieveStateFunction(
-                (state) -> {
-                    throw new IOException(
-                            "Failed to retrieve checkpoint " + state.getCheckpointID());
-                });
-
-        final TestingStateHandleStore<CompletedCheckpoint> stateHandleStore =
-                builder.setGetAllSupplier(() -> createStateHandles(ckpNum)).build();
-        final CompletedCheckpointStore completedCheckpointStore =
-                createCompletedCheckpointStore(stateHandleStore);
-
         try {
             completedCheckpointStore.recover();
-            fail("We should get an exception when retrieving state failed.");
-        } catch (Exception ex) {
-            final String errMsg =
-                    "Could not read any of the " + ckpNum + " checkpoints from storage.";
-            assertThat(ex, FlinkMatchers.containsMessage(errMsg));
+        } catch (Exception e) {
+            if (ExceptionUtils.findThrowable(e, IOException.class).isPresent()) {
+                return;
+            }
+            throw e;
         }
+        fail();
     }
 
     @Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
index af8c091..b3ca09d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
@@ -39,7 +39,6 @@ import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -64,8 +63,7 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger {
             ZooKeeperCheckpointStoreUtil.INSTANCE;
 
     /**
-     * Tests that the completed checkpoint store can retrieve all checkpoints stored in ZooKeeper
-     * and ignores those which cannot be retrieved via their state handles.
+     * Tests that the completed checkpoint store can retrieve all checkpoints stored in ZooKeeper.
      *
      * <p>We have a timeout in case the ZooKeeper store get's into a deadlock/livelock situation.
      */
@@ -81,11 +79,6 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger {
         expectedCheckpointIds.add(1L);
         expectedCheckpointIds.add(2L);
 
-        final RetrievableStateHandle<CompletedCheckpoint> failingRetrievableStateHandle =
-                mock(RetrievableStateHandle.class);
-        when(failingRetrievableStateHandle.retrieveState())
-                .thenThrow(new IOException("Test exception"));
-
         final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle1 =
                 mock(RetrievableStateHandle.class);
         when(retrievableStateHandle1.retrieveState())
@@ -121,9 +114,7 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger {
                                         new TestCompletedCheckpointStorageLocation())));
 
         checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, "/foobar1"));
-        checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing1"));
         checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2, "/foobar2"));
-        checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing2"));
 
         final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS);
         final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock =
@@ -206,10 +197,6 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger {
         // check that we did not discard any of the state handles
         verify(retrievableStateHandle1, never()).discardState();
         verify(retrievableStateHandle2, never()).discardState();
-
-        // Make sure that we also didn't discard any of the broken handles. Only when checkpoints
-        // are subsumed should they be discarded.
-        verify(failingRetrievableStateHandle, never()).discardState();
     }
 
     /**
@@ -230,11 +217,6 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger {
         expectedCheckpointIds.add(1L);
         expectedCheckpointIds.add(2L);
 
-        final RetrievableStateHandle<CompletedCheckpoint> failingRetrievableStateHandle =
-                mock(RetrievableStateHandle.class);
-        when(failingRetrievableStateHandle.retrieveState())
-                .thenThrow(new IOException("Test exception"));
-
         final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle1 =
                 mock(RetrievableStateHandle.class);
         when(retrievableStateHandle1.retrieveState())
@@ -268,9 +250,7 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger {
                                         new TestCompletedCheckpointStorageLocation())));
 
         checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, "/foobar1"));
-        checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing1"));
         checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2, "/foobar2"));
-        checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing2"));
 
         final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS);
         final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock =
@@ -353,9 +333,5 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger {
         // check that we did not discard any of the state handles
         verify(retrievableStateHandle1, never()).discardState();
         verify(retrievableStateHandle2, never()).discardState();
-
-        // Make sure that we also didn't discard any of the broken handles. Only when checkpoints
-        // are subsumed should they be discarded.
-        verify(failingRetrievableStateHandle, never()).discardState();
     }
 }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointStoreITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointStoreITCase.java
new file mode 100644
index 0000000..1ad5a8b
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointStoreITCase.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
+import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.api.common.restartstrategy.RestartStrategies.fixedDelayRestart;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Test that failure on recovery leads to job restart if configured, so that transient recovery
+ * failures can are mitigated.
+ */
+public class CheckpointStoreITCase extends TestLogger {
+
+    @ClassRule
+    public static final MiniClusterWithClientResource CLUSTER =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setConfiguration(
+                                    new Configuration()
+                                            .set(
+                                                    HighAvailabilityOptions.HA_MODE,
+                                                    TestingHAFactory.class.getName()))
+                            .build());
+
+    @Before
+    public void init() {
+        FailingStore.reset();
+        FailingMapper.reset();
+    }
+
+    @Test
+    public void testRestartOnRecoveryFailure() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10);
+        env.setRestartStrategy(fixedDelayRestart(2 /* failure on processing + on recovery */, 0));
+        env.addSource(emitUntil(() -> FailingStore.recovered && FailingMapper.failedAndProcessed))
+                .map(new FailingMapper())
+                .addSink(new DiscardingSink<>());
+        env.execute();
+
+        checkState(FailingStore.recovered && FailingMapper.failedAndProcessed);
+    }
+
+    private static class FailingMapper implements MapFunction<Integer, Integer> {
+        private static volatile boolean failed = false;
+        private static volatile boolean failedAndProcessed = false;
+
+        public static void reset() {
+            failed = false;
+            failedAndProcessed = false;
+        }
+
+        @Override
+        public Integer map(Integer element) throws Exception {
+            if (!failed) {
+                failed = true;
+                throw new RuntimeException();
+            } else {
+                failedAndProcessed = true;
+                return element;
+            }
+        }
+    }
+
+    /** TestingHAFactory. */
+    public static class TestingHAFactory implements HighAvailabilityServicesFactory {
+
+        @Override
+        public HighAvailabilityServices createHAServices(
+                Configuration configuration, Executor executor) {
+            return new EmbeddedHaServices(Executors.directExecutor()) {
+
+                @Override
+                public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
+                    return new TestingCheckpointRecoveryFactory(
+                            new FailingStore(),
+                            new TestingCheckpointIDCounter(new CompletableFuture<>()));
+                }
+            };
+        }
+    }
+
+    private static class FailingStore implements CompletedCheckpointStore {
+        private static volatile boolean started = false;
+        private static volatile boolean failed = false;
+        private static volatile boolean recovered = false;
+
+        public static void reset() {
+            started = failed = recovered = false;
+        }
+
+        @Override
+        public void recover() throws Exception {
+            if (!started) {
+                started = true;
+            } else if (!failed) {
+                failed = true;
+                throw new RuntimeException();
+            } else if (!recovered) {
+                recovered = true;
+            }
+        }
+
+        @Override
+        public void addCheckpoint(
+                CompletedCheckpoint checkpoint,
+                CheckpointsCleaner checkpointsCleaner,
+                Runnable postCleanup) {}
+
+        @Override
+        public void shutdown(
+                JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner, Runnable postCleanup)
+                throws Exception {}
+
+        @Override
+        public List<CompletedCheckpoint> getAllCheckpoints() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public int getNumberOfRetainedCheckpoints() {
+            return 0;
+        }
+
+        @Override
+        public int getMaxNumberOfRetainedCheckpoints() {
+            return 1;
+        }
+
+        @Override
+        public boolean requiresExternalizedCheckpoints() {
+            return false;
+        }
+    }
+
+    private SourceFunction<Integer> emitUntil(SerializableSupplier<Boolean> until) {
+        return new SourceFunction<Integer>() {
+            private volatile boolean running = true;
+
+            @Override
+            public void run(SourceContext<Integer> ctx) {
+                while (running && !until.get()) {
+                    synchronized (ctx.getCheckpointLock()) {
+                        ctx.collect(0);
+                        try {
+                            Thread.sleep(100);
+                        } catch (InterruptedException e) {
+                            ExceptionUtils.rethrow(e);
+                        }
+                    }
+                }
+            }
+
+            @Override
+            public void cancel() {
+                running = false;
+            }
+        };
+    }
+}