You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2021/02/22 20:30:03 UTC

[flink] branch master updated (85879bf -> bd91b6c)

This is an automated email from the ASF dual-hosted git repository.

roman pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 85879bf  [FLINK-21362][coordination] Move State.onEnter() logic into constructor
     new 3897147  [hotfix][checkpointing] Extract CheckpointSubsumeHelper
     new bd8e406  [FLINK-21351][checkpointing] Don't subsume last checkpoint
     new bd91b6c  [hotfix][tests] Remove mock from testAddCheckpointWithFailedRemove

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../checkpoint/CheckpointSubsumeHelper.java        | 106 +++++++++++++++++++++
 .../DefaultCompletedCheckpointStore.java           |  45 ++++-----
 .../EmbeddedCompletedCheckpointStore.java          |   5 +-
 .../StandaloneCompletedCheckpointStore.java        |  10 +-
 .../DefaultCompletedCheckpointStoreTest.java       |  83 +++++++++++++++-
 .../StandaloneCompletedCheckpointStoreTest.java    |  47 ++++-----
 ...oKeeperCompletedCheckpointStoreMockitoTest.java |  71 --------------
 .../ZooKeeperCompletedCheckpointStoreTest.java     |  44 +++++++++
 8 files changed, 283 insertions(+), 128 deletions(-)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java


[flink] 02/03: [FLINK-21351][checkpointing] Don't subsume last checkpoint

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bd8e406e547973da55fa47e260a910293637d1d7
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Feb 17 18:40:36 2021 +0100

    [FLINK-21351][checkpointing] Don't subsume last checkpoint
    
    When a savepoint is added to CompletedCheckpointStore
    all previous checkpoints will be removed if number to
    retain is 1.
    
    This makes future incremental checkpoints invalid since
    they can refer to the discarded state.
---
 .../checkpoint/CheckpointSubsumeHelper.java        | 60 +++++++++++++---
 .../DefaultCompletedCheckpointStoreTest.java       | 83 +++++++++++++++++++++-
 2 files changed, 132 insertions(+), 11 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
index 7d3e7aa..da8e575 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
@@ -17,11 +17,12 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.util.function.ThrowingConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Deque;
+import java.util.Iterator;
+import java.util.Optional;
 
 /**
  * Encapsulates the logic to subsume older checkpoints by {@link CompletedCheckpointStore checkpoint
@@ -46,21 +47,60 @@ class CheckpointSubsumeHelper {
     private static final Logger LOG = LoggerFactory.getLogger(CheckpointSubsumeHelper.class);
 
     public static void subsume(
-            Deque<CompletedCheckpoint> checkpoints,
-            int numRetain,
-            ThrowingConsumer<CompletedCheckpoint, Exception> subsumeAction)
+            Deque<CompletedCheckpoint> checkpoints, int numRetain, SubsumeAction subsumeAction)
             throws Exception {
         if (checkpoints.isEmpty() || checkpoints.size() <= numRetain) {
             return;
         }
+        CompletedCheckpoint latest = checkpoints.peekLast();
+        Optional<CompletedCheckpoint> latestNotSavepoint = getLatestNotSavepoint(checkpoints);
+        Iterator<CompletedCheckpoint> iterator = checkpoints.iterator();
+        while (checkpoints.size() > numRetain && iterator.hasNext()) {
+            CompletedCheckpoint next = iterator.next();
+            if (canSubsume(next, latest, latestNotSavepoint)) {
+                iterator.remove();
+                try {
+                    subsumeAction.subsume(next);
+                } catch (Exception e) {
+                    LOG.warn("Fail to subsume the old checkpoint.", e);
+                }
+            }
+            // Don't break out from the loop to subsume intermediate savepoints
+        }
+    }
 
-        while (checkpoints.size() > numRetain) {
-            CompletedCheckpoint completedCheckpoint = checkpoints.removeFirst();
-            try {
-                subsumeAction.accept(completedCheckpoint);
-            } catch (Exception e) {
-                LOG.warn("Fail to subsume the old checkpoint.", e);
+    private static Optional<CompletedCheckpoint> getLatestNotSavepoint(
+            Deque<CompletedCheckpoint> completed) {
+        Iterator<CompletedCheckpoint> descendingIterator = completed.descendingIterator();
+        while (descendingIterator.hasNext()) {
+            CompletedCheckpoint next = descendingIterator.next();
+            if (!next.getProperties().isSavepoint()) {
+                return Optional.of(next);
             }
         }
+        return Optional.empty();
+    }
+
+    private static boolean canSubsume(
+            CompletedCheckpoint next,
+            CompletedCheckpoint latest,
+            Optional<CompletedCheckpoint> latestNonSavepoint) {
+        if (next == latest) {
+            return false;
+        } else if (next.getProperties().isSavepoint()) {
+            return true;
+        } else if (latest.getProperties().isSynchronous()) {
+            // If the job has stopped with a savepoint then it's safe to subsume because no future
+            // snapshots will be taken during this run
+            return true;
+        } else {
+            // Don't remove the latest non-savepoint lest invalidate future incremental snapshots
+            return latestNonSavepoint.filter(checkpoint -> checkpoint != next).isPresent();
+        }
+    }
+
+    @FunctionalInterface
+    interface SubsumeAction {
+        void subsume(CompletedCheckpoint checkpoint) throws Exception;
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
index 0430085..bb0578c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.testutils.FlinkMatchers;
@@ -25,6 +26,7 @@ import org.apache.flink.runtime.persistence.TestingRetrievableStateStorageHelper
 import org.apache.flink.runtime.persistence.TestingStateHandleStore;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
@@ -35,6 +37,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -44,8 +47,11 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+import static java.util.Arrays.asList;
+import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
@@ -72,6 +78,56 @@ public class DefaultCompletedCheckpointStoreTest extends TestLogger {
         executorService.shutdownNow();
     }
 
+    @Test
+    public void testAtLeastOneCheckpointRetained() throws Exception {
+        CompletedCheckpoint cp1 = getCheckpoint(false, 1L);
+        CompletedCheckpoint cp2 = getCheckpoint(false, 2L);
+        CompletedCheckpoint sp1 = getCheckpoint(true, 3L);
+        CompletedCheckpoint sp2 = getCheckpoint(true, 4L);
+        CompletedCheckpoint sp3 = getCheckpoint(true, 5L);
+        testCheckpointRetention(1, asList(cp1, cp2, sp1, sp2, sp3), asList(cp2, sp3));
+    }
+
+    @Test
+    public void testOlderSavepointSubsumed() throws Exception {
+        CompletedCheckpoint cp1 = getCheckpoint(false, 1L);
+        CompletedCheckpoint sp1 = getCheckpoint(true, 2L);
+        CompletedCheckpoint cp2 = getCheckpoint(false, 3L);
+        testCheckpointRetention(1, asList(cp1, sp1, cp2), asList(cp2));
+    }
+
+    @Test
+    public void testSubsumeAfterStoppingWithSavepoint() throws Exception {
+        CompletedCheckpoint cp1 = getCheckpoint(false, 1L);
+        CompletedCheckpoint sp1 = getCheckpoint(true, 2L);
+        CompletedCheckpoint stop = getCheckpoint(CheckpointProperties.forSyncSavepoint(false), 3L);
+        testCheckpointRetention(1, asList(cp1, sp1, stop), asList(stop));
+    }
+
+    @Test
+    public void testNotSubsumedIfNotNeeded() throws Exception {
+        CompletedCheckpoint cp1 = getCheckpoint(false, 1L);
+        CompletedCheckpoint cp2 = getCheckpoint(false, 2L);
+        CompletedCheckpoint cp3 = getCheckpoint(false, 3L);
+        testCheckpointRetention(3, asList(cp1, cp2, cp3), asList(cp1, cp2, cp3));
+    }
+
+    private void testCheckpointRetention(
+            int numRetain,
+            List<CompletedCheckpoint> completed,
+            List<CompletedCheckpoint> expectedRetained)
+            throws Exception {
+        final TestingStateHandleStore<CompletedCheckpoint> stateHandleStore =
+                builder.setGetAllSupplier(() -> createStateHandles(3)).build();
+        final CompletedCheckpointStore completedCheckpointStore =
+                createCompletedCheckpointStore(stateHandleStore, numRetain);
+
+        for (CompletedCheckpoint c : completed) {
+            completedCheckpointStore.addCheckpoint(c, new CheckpointsCleaner(), () -> {});
+        }
+        assertEquals(expectedRetained, completedCheckpointStore.getAllCheckpoints());
+    }
+
     /**
      * We have three completed checkpoints(1, 2, 3) in the state handle store. We expect that {@link
      * DefaultCompletedCheckpointStore#recover()} should recover the sorted checkpoints by name.
@@ -298,8 +354,13 @@ public class DefaultCompletedCheckpointStoreTest extends TestLogger {
 
     private CompletedCheckpointStore createCompletedCheckpointStore(
             TestingStateHandleStore<CompletedCheckpoint> stateHandleStore) {
+        return createCompletedCheckpointStore(stateHandleStore, 1);
+    }
+
+    private CompletedCheckpointStore createCompletedCheckpointStore(
+            TestingStateHandleStore<CompletedCheckpoint> stateHandleStore, int toRetain) {
         return new DefaultCompletedCheckpointStore<>(
-                1,
+                toRetain,
                 stateHandleStore,
                 new CheckpointStoreUtil() {
                     @Override
@@ -314,4 +375,24 @@ public class DefaultCompletedCheckpointStoreTest extends TestLogger {
                 },
                 executorService);
     }
+
+    private CompletedCheckpoint getCheckpoint(boolean isSavepoint, long id) {
+        return getCheckpoint(
+                isSavepoint
+                        ? CheckpointProperties.forSavepoint(false)
+                        : CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION),
+                id);
+    }
+
+    private CompletedCheckpoint getCheckpoint(CheckpointProperties props, long id) {
+        return new CompletedCheckpoint(
+                new JobID(),
+                id,
+                0L,
+                0L,
+                Collections.emptyMap(),
+                Collections.emptyList(),
+                props,
+                new TestCompletedCheckpointStorageLocation());
+    }
 }


[flink] 03/03: [hotfix][tests] Remove mock from testAddCheckpointWithFailedRemove

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bd91b6c9d7a3606fd8caee4be62cab2c582f1d53
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Fri Feb 19 16:27:23 2021 +0100

    [hotfix][tests] Remove mock from testAddCheckpointWithFailedRemove
    
    Additionally, remove dead code and check that an
    exception was thrown.
---
 .../StandaloneCompletedCheckpointStoreTest.java    | 47 +++++++-------
 ...oKeeperCompletedCheckpointStoreMockitoTest.java | 71 ----------------------
 .../ZooKeeperCompletedCheckpointStoreTest.java     | 44 ++++++++++++++
 3 files changed, 68 insertions(+), 94 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
index 54fdb09..e2c619f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
@@ -22,23 +22,20 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 
 import org.junit.Test;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 
+import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.powermock.api.mockito.PowerMockito.doReturn;
-import static org.powermock.api.mockito.PowerMockito.doThrow;
-import static org.powermock.api.mockito.PowerMockito.mock;
 
 /** Tests for basic {@link CompletedCheckpointStore} contract. */
 public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointStoreTest {
@@ -100,24 +97,28 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
         CompletedCheckpointStore store =
                 createCompletedCheckpoints(numCheckpointsToRetain, Executors.directExecutor());
 
-        for (long i = 0; i <= numCheckpointsToRetain; ++i) {
-            CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class);
-            doReturn(i).when(checkpointToAdd).getCheckpointID();
-            doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates();
-            doThrow(new IOException()).when(checkpointToAdd).discardOnSubsume();
-
-            try {
-                store.addCheckpoint(checkpointToAdd, new CheckpointsCleaner(), () -> {});
-
-                // The checkpoint should be in the store if we successfully add it into the store.
-                List<CompletedCheckpoint> addedCheckpoints = store.getAllCheckpoints();
-                assertTrue(addedCheckpoints.contains(checkpointToAdd));
-            } catch (Exception e) {
-                // The checkpoint should not be in the store if any exception is thrown.
-                List<CompletedCheckpoint> addedCheckpoints = store.getAllCheckpoints();
-                assertFalse(addedCheckpoints.contains(checkpointToAdd));
-            }
+        CountDownLatch discardAttempted = new CountDownLatch(1);
+        for (long i = 0; i < numCheckpointsToRetain + 1; ++i) {
+            CompletedCheckpoint checkpointToAdd =
+                    new CompletedCheckpoint(
+                            new JobID(),
+                            i,
+                            i,
+                            i,
+                            Collections.emptyMap(),
+                            Collections.emptyList(),
+                            CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION),
+                            new TestCompletedCheckpointStorageLocation()) {
+                        @Override
+                        public boolean discardOnSubsume() {
+                            discardAttempted.countDown();
+                            throw new RuntimeException();
+                        }
+                    };
+            // should fail despite the exception
+            store.addCheckpoint(checkpointToAdd, new CheckpointsCleaner(), () -> {});
         }
+        discardAttempted.await();
     }
 
     @Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
index 8b7c9fc..af8c091 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
@@ -42,20 +42,15 @@ import org.mockito.stubbing.Answer;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.Executor;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
@@ -363,70 +358,4 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger {
         // are subsumed should they be discarded.
         verify(failingRetrievableStateHandle, never()).discardState();
     }
-
-    /**
-     * Tests that the checkpoint does not exist in the store when we fail to add it into the store
-     * (i.e., there exists an exception thrown by the method).
-     */
-    @Test
-    public void testAddCheckpointWithFailedRemove() throws Exception {
-        final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS);
-        final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock =
-                mock(RetrievableStateStorageHelper.class);
-
-        ZooKeeperStateHandleStore<CompletedCheckpoint> zookeeperStateHandleStoreMock =
-                spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock));
-
-        doAnswer(
-                        new Answer<RetrievableStateHandle<CompletedCheckpoint>>() {
-                            @Override
-                            public RetrievableStateHandle<CompletedCheckpoint> answer(
-                                    InvocationOnMock invocationOnMock) throws Throwable {
-                                CompletedCheckpoint checkpoint =
-                                        (CompletedCheckpoint) invocationOnMock.getArguments()[1];
-
-                                RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle =
-                                        mock(RetrievableStateHandle.class);
-                                when(retrievableStateHandle.retrieveState()).thenReturn(checkpoint);
-
-                                return retrievableStateHandle;
-                            }
-                        })
-                .when(zookeeperStateHandleStoreMock)
-                .addAndLock(anyString(), any(CompletedCheckpoint.class));
-
-        doThrow(new Exception())
-                .when(zookeeperStateHandleStoreMock)
-                .releaseAndTryRemove(anyString());
-
-        final int numCheckpointsToRetain = 1;
-
-        CompletedCheckpointStore zooKeeperCompletedCheckpointStore =
-                new DefaultCompletedCheckpointStore<>(
-                        numCheckpointsToRetain,
-                        zookeeperStateHandleStoreMock,
-                        zooKeeperCheckpointStoreUtil,
-                        Executors.directExecutor());
-
-        for (long i = 0; i <= numCheckpointsToRetain; ++i) {
-            CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class);
-            doReturn(i).when(checkpointToAdd).getCheckpointID();
-            doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates();
-
-            try {
-                zooKeeperCompletedCheckpointStore.addCheckpoint(
-                        checkpointToAdd, new CheckpointsCleaner(), () -> {});
-
-                // The checkpoint should be in the store if we successfully add it into the store.
-                List<CompletedCheckpoint> addedCheckpoints =
-                        zooKeeperCompletedCheckpointStore.getAllCheckpoints();
-                assertTrue(addedCheckpoints.contains(checkpointToAdd));
-            } catch (Exception e) {
-                // The checkpoint should not be in the store if any exception is thrown.
-                List<CompletedCheckpoint> addedCheckpoints =
-                        zooKeeperCompletedCheckpointStore.getAllCheckpoints();
-                assertFalse(addedCheckpoints.contains(checkpointToAdd));
-            }
-        }
-    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 0811a7e..e17a4f6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
@@ -26,6 +27,7 @@ import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
@@ -42,10 +44,13 @@ import org.junit.Test;
 import javax.annotation.Nonnull;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.function.Function;
 import java.util.stream.IntStream;
 
+import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
 import static org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest.createCheckpoint;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.apache.flink.util.ExceptionUtils.rethrow;
@@ -296,4 +301,43 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
             return 0;
         }
     }
+
+    /**
+     * Tests that the checkpoint does not exist in the store when we fail to add it into the store
+     * (i.e., there exists an exception thrown by the method).
+     */
+    @Test
+    public void testAddCheckpointWithFailedRemove() throws Exception {
+
+        final int numCheckpointsToRetain = 1;
+        final Configuration configuration = new Configuration();
+        configuration.setString(
+                HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
+
+        final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
+        final CompletedCheckpointStore store = createZooKeeperCheckpointStore(client);
+
+        CountDownLatch discardAttempted = new CountDownLatch(1);
+        for (long i = 0; i < numCheckpointsToRetain + 1; ++i) {
+            CompletedCheckpoint checkpointToAdd =
+                    new CompletedCheckpoint(
+                            new JobID(),
+                            i,
+                            i,
+                            i,
+                            Collections.emptyMap(),
+                            Collections.emptyList(),
+                            CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION),
+                            new TestCompletedCheckpointStorageLocation());
+            // shouldn't fail despite the exception
+            store.addCheckpoint(
+                    checkpointToAdd,
+                    new CheckpointsCleaner(),
+                    () -> {
+                        discardAttempted.countDown();
+                        throw new RuntimeException();
+                    });
+        }
+        discardAttempted.await();
+    }
 }


[flink] 01/03: [hotfix][checkpointing] Extract CheckpointSubsumeHelper

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 38971479d4e4b5383d5e705db9ab0236b7010419
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Feb 17 18:40:30 2021 +0100

    [hotfix][checkpointing] Extract CheckpointSubsumeHelper
    
    This is a pre-requisite refactoring for a subsequent bug fix.
---
 .../checkpoint/CheckpointSubsumeHelper.java        | 66 ++++++++++++++++++++++
 .../DefaultCompletedCheckpointStore.java           | 45 +++++++--------
 .../EmbeddedCompletedCheckpointStore.java          |  5 +-
 .../StandaloneCompletedCheckpointStore.java        | 10 +---
 4 files changed, 93 insertions(+), 33 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
new file mode 100644
index 0000000..7d3e7aa
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+
+/**
+ * Encapsulates the logic to subsume older checkpoints by {@link CompletedCheckpointStore checkpoint
+ * stores}. In general, checkpoints should be subsumed whenever state.checkpoints.num-retained is
+ * exceeded.
+ *
+ * <p>Additional considerations:
+ *
+ * <ul>
+ *   <li>Savepoints must be stored in the same queue to prevent duplicates (@see <a
+ *       href="https://issues.apache.org/jira/browse/FLINK-10354">FLINK-10354</a>).
+ *   <li>To prevent unlimited queue growth, savepoints are also counted in num-retained together
+ *       with checkpoints
+ *   <li>Savepoints actual state should NOT be discarded when they are subsumed.
+ *   <li>At least one (most recent) checkpoint (not savepoint) should be kept. Otherwise, subsequent
+ *       incremental checkpoints may refer to a discarded state (@see <a
+ *       href="https://issues.apache.org/jira/browse/FLINK-21351">FLINK-21351</a>).
+ *   <li>Except when the job is stopped with savepoint when no future checkpoints will be made.
+ * </ul>
+ */
+class CheckpointSubsumeHelper {
+    private static final Logger LOG = LoggerFactory.getLogger(CheckpointSubsumeHelper.class);
+
+    public static void subsume(
+            Deque<CompletedCheckpoint> checkpoints,
+            int numRetain,
+            ThrowingConsumer<CompletedCheckpoint, Exception> subsumeAction)
+            throws Exception {
+        if (checkpoints.isEmpty() || checkpoints.size() <= numRetain) {
+            return;
+        }
+
+        while (checkpoints.size() > numRetain) {
+            CompletedCheckpoint completedCheckpoint = checkpoints.removeFirst();
+            try {
+                subsumeAction.accept(completedCheckpoint);
+            } catch (Exception e) {
+                LOG.warn("Fail to subsume the old checkpoint.", e);
+            }
+        }
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
index 0db2d07..0f419fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
@@ -230,15 +230,15 @@ public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>>
 
         completedCheckpoints.addLast(checkpoint);
 
-        // Everything worked, let's remove a previous checkpoint if necessary.
-        while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) {
-            final CompletedCheckpoint completedCheckpoint = completedCheckpoints.removeFirst();
-            tryRemoveCompletedCheckpoint(
-                    completedCheckpoint,
-                    completedCheckpoint.shouldBeDiscardedOnSubsume(),
-                    checkpointsCleaner,
-                    postCleanup);
-        }
+        CheckpointSubsumeHelper.subsume(
+                completedCheckpoints,
+                maxNumberOfCheckpointsToRetain,
+                completedCheckpoint ->
+                        tryRemoveCompletedCheckpoint(
+                                completedCheckpoint,
+                                completedCheckpoint.shouldBeDiscardedOnSubsume(),
+                                checkpointsCleaner,
+                                postCleanup));
 
         LOG.debug("Added {} to {}.", checkpoint, path);
     }
@@ -265,11 +265,15 @@ public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>>
             LOG.info("Shutting down");
 
             for (CompletedCheckpoint checkpoint : completedCheckpoints) {
-                tryRemoveCompletedCheckpoint(
-                        checkpoint,
-                        checkpoint.shouldBeDiscardedOnShutdown(jobStatus),
-                        checkpointsCleaner,
-                        () -> {});
+                try {
+                    tryRemoveCompletedCheckpoint(
+                            checkpoint,
+                            checkpoint.shouldBeDiscardedOnShutdown(jobStatus),
+                            checkpointsCleaner,
+                            () -> {});
+                } catch (Exception e) {
+                    LOG.warn("Fail to remove checkpoint during shutdown.", e);
+                }
             }
 
             completedCheckpoints.clear();
@@ -292,14 +296,11 @@ public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>>
             CompletedCheckpoint completedCheckpoint,
             boolean shouldDiscard,
             CheckpointsCleaner checkpointsCleaner,
-            Runnable postCleanup) {
-        try {
-            if (tryRemove(completedCheckpoint.getCheckpointID())) {
-                checkpointsCleaner.cleanCheckpoint(
-                        completedCheckpoint, shouldDiscard, postCleanup, ioExecutor);
-            }
-        } catch (Exception e) {
-            LOG.warn("Failed to subsume the old checkpoint", e);
+            Runnable postCleanup)
+            throws Exception {
+        if (tryRemove(completedCheckpoint.getCheckpointID())) {
+            checkpointsCleaner.cleanCheckpoint(
+                    completedCheckpoint, shouldDiscard, postCleanup, ioExecutor);
         }
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
index 174f0ea..d55857c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
@@ -62,9 +62,8 @@ public class EmbeddedCompletedCheckpointStore implements CompletedCheckpointStor
             throws Exception {
         checkpoints.addLast(checkpoint);
 
-        if (checkpoints.size() > maxRetainedCheckpoints) {
-            removeOldestCheckpoint();
-        }
+        CheckpointSubsumeHelper.subsume(
+                checkpoints, maxRetainedCheckpoints, CompletedCheckpoint::discardOnSubsume);
     }
 
     @VisibleForTesting
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index 5863ec9..7723ff8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -70,14 +70,8 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt
 
         checkpoints.addLast(checkpoint);
 
-        if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
-            try {
-                CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst();
-                checkpointToSubsume.discardOnSubsume();
-            } catch (Exception e) {
-                LOG.warn("Fail to subsume the old checkpoint.", e);
-            }
-        }
+        CheckpointSubsumeHelper.subsume(
+                checkpoints, maxNumberOfCheckpointsToRetain, CompletedCheckpoint::discardOnSubsume);
     }
 
     @Override