You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2021/02/22 20:30:03 UTC
[flink] branch master updated (85879bf -> bd91b6c)
This is an automated email from the ASF dual-hosted git repository.
roman pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.
from 85879bf [FLINK-21362][coordination] Move State.onEnter() logic into constructor
new 3897147 [hotfix][checkpointing] Extract CheckpointSubsumeHelper
new bd8e406 [FLINK-21351][checkpointing] Don't subsume last checkpoint
new bd91b6c [hotfix][tests] Remove mock from testAddCheckpointWithFailedRemove
The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../checkpoint/CheckpointSubsumeHelper.java | 106 +++++++++++++++++++++
.../DefaultCompletedCheckpointStore.java | 45 ++++-----
.../EmbeddedCompletedCheckpointStore.java | 5 +-
.../StandaloneCompletedCheckpointStore.java | 10 +-
.../DefaultCompletedCheckpointStoreTest.java | 83 +++++++++++++++-
.../StandaloneCompletedCheckpointStoreTest.java | 47 ++++-----
...oKeeperCompletedCheckpointStoreMockitoTest.java | 71 --------------
.../ZooKeeperCompletedCheckpointStoreTest.java | 44 +++++++++
8 files changed, 283 insertions(+), 128 deletions(-)
create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
[flink] 02/03: [FLINK-21351][checkpointing] Don't subsume last
checkpoint
Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit bd8e406e547973da55fa47e260a910293637d1d7
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Feb 17 18:40:36 2021 +0100
[FLINK-21351][checkpointing] Don't subsume last checkpoint
When a savepoint is added to CompletedCheckpointStore
all previous checkpoints will be removed if number to
retain is 1.
This makes future incremental checkpoints invalid since
they can refer to the discarded state.
---
.../checkpoint/CheckpointSubsumeHelper.java | 60 +++++++++++++---
.../DefaultCompletedCheckpointStoreTest.java | 83 +++++++++++++++++++++-
2 files changed, 132 insertions(+), 11 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
index 7d3e7aa..da8e575 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
@@ -17,11 +17,12 @@
package org.apache.flink.runtime.checkpoint;
-import org.apache.flink.util.function.ThrowingConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Deque;
+import java.util.Iterator;
+import java.util.Optional;
/**
* Encapsulates the logic to subsume older checkpoints by {@link CompletedCheckpointStore checkpoint
@@ -46,21 +47,60 @@ class CheckpointSubsumeHelper {
private static final Logger LOG = LoggerFactory.getLogger(CheckpointSubsumeHelper.class);
public static void subsume(
- Deque<CompletedCheckpoint> checkpoints,
- int numRetain,
- ThrowingConsumer<CompletedCheckpoint, Exception> subsumeAction)
+ Deque<CompletedCheckpoint> checkpoints, int numRetain, SubsumeAction subsumeAction)
throws Exception {
if (checkpoints.isEmpty() || checkpoints.size() <= numRetain) {
return;
}
+ CompletedCheckpoint latest = checkpoints.peekLast();
+ Optional<CompletedCheckpoint> latestNotSavepoint = getLatestNotSavepoint(checkpoints);
+ Iterator<CompletedCheckpoint> iterator = checkpoints.iterator();
+ while (checkpoints.size() > numRetain && iterator.hasNext()) {
+ CompletedCheckpoint next = iterator.next();
+ if (canSubsume(next, latest, latestNotSavepoint)) {
+ iterator.remove();
+ try {
+ subsumeAction.subsume(next);
+ } catch (Exception e) {
+ LOG.warn("Fail to subsume the old checkpoint.", e);
+ }
+ }
+ // Don't break out from the loop to subsume intermediate savepoints
+ }
+ }
- while (checkpoints.size() > numRetain) {
- CompletedCheckpoint completedCheckpoint = checkpoints.removeFirst();
- try {
- subsumeAction.accept(completedCheckpoint);
- } catch (Exception e) {
- LOG.warn("Fail to subsume the old checkpoint.", e);
+ private static Optional<CompletedCheckpoint> getLatestNotSavepoint(
+ Deque<CompletedCheckpoint> completed) {
+ Iterator<CompletedCheckpoint> descendingIterator = completed.descendingIterator();
+ while (descendingIterator.hasNext()) {
+ CompletedCheckpoint next = descendingIterator.next();
+ if (!next.getProperties().isSavepoint()) {
+ return Optional.of(next);
}
}
+ return Optional.empty();
+ }
+
+ private static boolean canSubsume(
+ CompletedCheckpoint next,
+ CompletedCheckpoint latest,
+ Optional<CompletedCheckpoint> latestNonSavepoint) {
+ if (next == latest) {
+ return false;
+ } else if (next.getProperties().isSavepoint()) {
+ return true;
+ } else if (latest.getProperties().isSynchronous()) {
+ // If the job has stopped with a savepoint then it's safe to subsume because no future
+ // snapshots will be taken during this run
+ return true;
+ } else {
+ // Don't remove the latest non-savepoint lest invalidate future incremental snapshots
+ return latestNonSavepoint.filter(checkpoint -> checkpoint != next).isPresent();
+ }
+ }
+
+ @FunctionalInterface
+ interface SubsumeAction {
+ void subsume(CompletedCheckpoint checkpoint) throws Exception;
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
index 0430085..bb0578c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.checkpoint;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.testutils.FlinkMatchers;
@@ -25,6 +26,7 @@ import org.apache.flink.runtime.persistence.TestingRetrievableStateStorageHelper
import org.apache.flink.runtime.persistence.TestingStateHandleStore;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
@@ -35,6 +37,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -44,8 +47,11 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import static java.util.Arrays.asList;
+import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
@@ -72,6 +78,56 @@ public class DefaultCompletedCheckpointStoreTest extends TestLogger {
executorService.shutdownNow();
}
+ @Test
+ public void testAtLeastOneCheckpointRetained() throws Exception {
+ CompletedCheckpoint cp1 = getCheckpoint(false, 1L);
+ CompletedCheckpoint cp2 = getCheckpoint(false, 2L);
+ CompletedCheckpoint sp1 = getCheckpoint(true, 3L);
+ CompletedCheckpoint sp2 = getCheckpoint(true, 4L);
+ CompletedCheckpoint sp3 = getCheckpoint(true, 5L);
+ testCheckpointRetention(1, asList(cp1, cp2, sp1, sp2, sp3), asList(cp2, sp3));
+ }
+
+ @Test
+ public void testOlderSavepointSubsumed() throws Exception {
+ CompletedCheckpoint cp1 = getCheckpoint(false, 1L);
+ CompletedCheckpoint sp1 = getCheckpoint(true, 2L);
+ CompletedCheckpoint cp2 = getCheckpoint(false, 3L);
+ testCheckpointRetention(1, asList(cp1, sp1, cp2), asList(cp2));
+ }
+
+ @Test
+ public void testSubsumeAfterStoppingWithSavepoint() throws Exception {
+ CompletedCheckpoint cp1 = getCheckpoint(false, 1L);
+ CompletedCheckpoint sp1 = getCheckpoint(true, 2L);
+ CompletedCheckpoint stop = getCheckpoint(CheckpointProperties.forSyncSavepoint(false), 3L);
+ testCheckpointRetention(1, asList(cp1, sp1, stop), asList(stop));
+ }
+
+ @Test
+ public void testNotSubsumedIfNotNeeded() throws Exception {
+ CompletedCheckpoint cp1 = getCheckpoint(false, 1L);
+ CompletedCheckpoint cp2 = getCheckpoint(false, 2L);
+ CompletedCheckpoint cp3 = getCheckpoint(false, 3L);
+ testCheckpointRetention(3, asList(cp1, cp2, cp3), asList(cp1, cp2, cp3));
+ }
+
+ private void testCheckpointRetention(
+ int numRetain,
+ List<CompletedCheckpoint> completed,
+ List<CompletedCheckpoint> expectedRetained)
+ throws Exception {
+ final TestingStateHandleStore<CompletedCheckpoint> stateHandleStore =
+ builder.setGetAllSupplier(() -> createStateHandles(3)).build();
+ final CompletedCheckpointStore completedCheckpointStore =
+ createCompletedCheckpointStore(stateHandleStore, numRetain);
+
+ for (CompletedCheckpoint c : completed) {
+ completedCheckpointStore.addCheckpoint(c, new CheckpointsCleaner(), () -> {});
+ }
+ assertEquals(expectedRetained, completedCheckpointStore.getAllCheckpoints());
+ }
+
/**
* We have three completed checkpoints(1, 2, 3) in the state handle store. We expect that {@link
* DefaultCompletedCheckpointStore#recover()} should recover the sorted checkpoints by name.
@@ -298,8 +354,13 @@ public class DefaultCompletedCheckpointStoreTest extends TestLogger {
private CompletedCheckpointStore createCompletedCheckpointStore(
TestingStateHandleStore<CompletedCheckpoint> stateHandleStore) {
+ return createCompletedCheckpointStore(stateHandleStore, 1);
+ }
+
+ private CompletedCheckpointStore createCompletedCheckpointStore(
+ TestingStateHandleStore<CompletedCheckpoint> stateHandleStore, int toRetain) {
return new DefaultCompletedCheckpointStore<>(
- 1,
+ toRetain,
stateHandleStore,
new CheckpointStoreUtil() {
@Override
@@ -314,4 +375,24 @@ public class DefaultCompletedCheckpointStoreTest extends TestLogger {
},
executorService);
}
+
+ private CompletedCheckpoint getCheckpoint(boolean isSavepoint, long id) {
+ return getCheckpoint(
+ isSavepoint
+ ? CheckpointProperties.forSavepoint(false)
+ : CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION),
+ id);
+ }
+
+ private CompletedCheckpoint getCheckpoint(CheckpointProperties props, long id) {
+ return new CompletedCheckpoint(
+ new JobID(),
+ id,
+ 0L,
+ 0L,
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ props,
+ new TestCompletedCheckpointStorageLocation());
+ }
}
[flink] 03/03: [hotfix][tests] Remove mock from
testAddCheckpointWithFailedRemove
Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit bd91b6c9d7a3606fd8caee4be62cab2c582f1d53
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Fri Feb 19 16:27:23 2021 +0100
[hotfix][tests] Remove mock from testAddCheckpointWithFailedRemove
Additionally, remove dead code and check that an
exception was thrown.
---
.../StandaloneCompletedCheckpointStoreTest.java | 47 +++++++-------
...oKeeperCompletedCheckpointStoreMockitoTest.java | 71 ----------------------
.../ZooKeeperCompletedCheckpointStoreTest.java | 44 ++++++++++++++
3 files changed, 68 insertions(+), 94 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
index 54fdb09..e2c619f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
@@ -22,23 +22,20 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.junit.Test;
-import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
-import java.util.List;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
+import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.powermock.api.mockito.PowerMockito.doReturn;
-import static org.powermock.api.mockito.PowerMockito.doThrow;
-import static org.powermock.api.mockito.PowerMockito.mock;
/** Tests for basic {@link CompletedCheckpointStore} contract. */
public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointStoreTest {
@@ -100,24 +97,28 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
CompletedCheckpointStore store =
createCompletedCheckpoints(numCheckpointsToRetain, Executors.directExecutor());
- for (long i = 0; i <= numCheckpointsToRetain; ++i) {
- CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class);
- doReturn(i).when(checkpointToAdd).getCheckpointID();
- doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates();
- doThrow(new IOException()).when(checkpointToAdd).discardOnSubsume();
-
- try {
- store.addCheckpoint(checkpointToAdd, new CheckpointsCleaner(), () -> {});
-
- // The checkpoint should be in the store if we successfully add it into the store.
- List<CompletedCheckpoint> addedCheckpoints = store.getAllCheckpoints();
- assertTrue(addedCheckpoints.contains(checkpointToAdd));
- } catch (Exception e) {
- // The checkpoint should not be in the store if any exception is thrown.
- List<CompletedCheckpoint> addedCheckpoints = store.getAllCheckpoints();
- assertFalse(addedCheckpoints.contains(checkpointToAdd));
- }
+ CountDownLatch discardAttempted = new CountDownLatch(1);
+ for (long i = 0; i < numCheckpointsToRetain + 1; ++i) {
+ CompletedCheckpoint checkpointToAdd =
+ new CompletedCheckpoint(
+ new JobID(),
+ i,
+ i,
+ i,
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION),
+ new TestCompletedCheckpointStorageLocation()) {
+ @Override
+ public boolean discardOnSubsume() {
+ discardAttempted.countDown();
+ throw new RuntimeException();
+ }
+ };
+ // should fail despite the exception
+ store.addCheckpoint(checkpointToAdd, new CheckpointsCleaner(), () -> {});
}
+ discardAttempted.await();
}
@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
index 8b7c9fc..af8c091 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
@@ -42,20 +42,15 @@ import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.Executor;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
@@ -363,70 +358,4 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger {
// are subsumed should they be discarded.
verify(failingRetrievableStateHandle, never()).discardState();
}
-
- /**
- * Tests that the checkpoint does not exist in the store when we fail to add it into the store
- * (i.e., there exists an exception thrown by the method).
- */
- @Test
- public void testAddCheckpointWithFailedRemove() throws Exception {
- final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS);
- final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock =
- mock(RetrievableStateStorageHelper.class);
-
- ZooKeeperStateHandleStore<CompletedCheckpoint> zookeeperStateHandleStoreMock =
- spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock));
-
- doAnswer(
- new Answer<RetrievableStateHandle<CompletedCheckpoint>>() {
- @Override
- public RetrievableStateHandle<CompletedCheckpoint> answer(
- InvocationOnMock invocationOnMock) throws Throwable {
- CompletedCheckpoint checkpoint =
- (CompletedCheckpoint) invocationOnMock.getArguments()[1];
-
- RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle =
- mock(RetrievableStateHandle.class);
- when(retrievableStateHandle.retrieveState()).thenReturn(checkpoint);
-
- return retrievableStateHandle;
- }
- })
- .when(zookeeperStateHandleStoreMock)
- .addAndLock(anyString(), any(CompletedCheckpoint.class));
-
- doThrow(new Exception())
- .when(zookeeperStateHandleStoreMock)
- .releaseAndTryRemove(anyString());
-
- final int numCheckpointsToRetain = 1;
-
- CompletedCheckpointStore zooKeeperCompletedCheckpointStore =
- new DefaultCompletedCheckpointStore<>(
- numCheckpointsToRetain,
- zookeeperStateHandleStoreMock,
- zooKeeperCheckpointStoreUtil,
- Executors.directExecutor());
-
- for (long i = 0; i <= numCheckpointsToRetain; ++i) {
- CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class);
- doReturn(i).when(checkpointToAdd).getCheckpointID();
- doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates();
-
- try {
- zooKeeperCompletedCheckpointStore.addCheckpoint(
- checkpointToAdd, new CheckpointsCleaner(), () -> {});
-
- // The checkpoint should be in the store if we successfully add it into the store.
- List<CompletedCheckpoint> addedCheckpoints =
- zooKeeperCompletedCheckpointStore.getAllCheckpoints();
- assertTrue(addedCheckpoints.contains(checkpointToAdd));
- } catch (Exception e) {
- // The checkpoint should not be in the store if any exception is thrown.
- List<CompletedCheckpoint> addedCheckpoints =
- zooKeeperCompletedCheckpointStore.getAllCheckpoints();
- assertFalse(addedCheckpoints.contains(checkpointToAdd));
- }
- }
- }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 0811a7e..e17a4f6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.checkpoint;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
@@ -26,6 +27,7 @@ import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
@@ -42,10 +44,13 @@ import org.junit.Test;
import javax.annotation.Nonnull;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
import java.util.stream.IntStream;
+import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
import static org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest.createCheckpoint;
import static org.apache.flink.util.ExceptionUtils.findThrowable;
import static org.apache.flink.util.ExceptionUtils.rethrow;
@@ -296,4 +301,43 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
return 0;
}
}
+
+ /**
+ * Tests that the checkpoint does not exist in the store when we fail to add it into the store
+ * (i.e., there exists an exception thrown by the method).
+ */
+ @Test
+ public void testAddCheckpointWithFailedRemove() throws Exception {
+
+ final int numCheckpointsToRetain = 1;
+ final Configuration configuration = new Configuration();
+ configuration.setString(
+ HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
+
+ final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
+ final CompletedCheckpointStore store = createZooKeeperCheckpointStore(client);
+
+ CountDownLatch discardAttempted = new CountDownLatch(1);
+ for (long i = 0; i < numCheckpointsToRetain + 1; ++i) {
+ CompletedCheckpoint checkpointToAdd =
+ new CompletedCheckpoint(
+ new JobID(),
+ i,
+ i,
+ i,
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION),
+ new TestCompletedCheckpointStorageLocation());
+ // shouldn't fail despite the exception
+ store.addCheckpoint(
+ checkpointToAdd,
+ new CheckpointsCleaner(),
+ () -> {
+ discardAttempted.countDown();
+ throw new RuntimeException();
+ });
+ }
+ discardAttempted.await();
+ }
}
[flink] 01/03: [hotfix][checkpointing] Extract
CheckpointSubsumeHelper
Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 38971479d4e4b5383d5e705db9ab0236b7010419
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Feb 17 18:40:30 2021 +0100
[hotfix][checkpointing] Extract CheckpointSubsumeHelper
This is a pre-requisite refactoring for a subsequent bug fix.
---
.../checkpoint/CheckpointSubsumeHelper.java | 66 ++++++++++++++++++++++
.../DefaultCompletedCheckpointStore.java | 45 +++++++--------
.../EmbeddedCompletedCheckpointStore.java | 5 +-
.../StandaloneCompletedCheckpointStore.java | 10 +---
4 files changed, 93 insertions(+), 33 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
new file mode 100644
index 0000000..7d3e7aa
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointSubsumeHelper.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+
+/**
+ * Encapsulates the logic to subsume older checkpoints by {@link CompletedCheckpointStore checkpoint
+ * stores}. In general, checkpoints should be subsumed whenever state.checkpoints.num-retained is
+ * exceeded.
+ *
+ * <p>Additional considerations:
+ *
+ * <ul>
+ * <li>Savepoints must be stored in the same queue to prevent duplicates (@see <a
+ * href="https://issues.apache.org/jira/browse/FLINK-10354">FLINK-10354</a>).
+ * <li>To prevent unlimited queue growth, savepoints are also counted in num-retained together
+ * with checkpoints
+ * <li>Savepoints actual state should NOT be discarded when they are subsumed.
+ * <li>At least one (most recent) checkpoint (not savepoint) should be kept. Otherwise, subsequent
+ * incremental checkpoints may refer to a discarded state (@see <a
+ * href="https://issues.apache.org/jira/browse/FLINK-21351">FLINK-21351</a>).
+ * <li>Except when the job is stopped with savepoint when no future checkpoints will be made.
+ * </ul>
+ */
+class CheckpointSubsumeHelper {
+ private static final Logger LOG = LoggerFactory.getLogger(CheckpointSubsumeHelper.class);
+
+ public static void subsume(
+ Deque<CompletedCheckpoint> checkpoints,
+ int numRetain,
+ ThrowingConsumer<CompletedCheckpoint, Exception> subsumeAction)
+ throws Exception {
+ if (checkpoints.isEmpty() || checkpoints.size() <= numRetain) {
+ return;
+ }
+
+ while (checkpoints.size() > numRetain) {
+ CompletedCheckpoint completedCheckpoint = checkpoints.removeFirst();
+ try {
+ subsumeAction.accept(completedCheckpoint);
+ } catch (Exception e) {
+ LOG.warn("Fail to subsume the old checkpoint.", e);
+ }
+ }
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
index 0db2d07..0f419fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
@@ -230,15 +230,15 @@ public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>>
completedCheckpoints.addLast(checkpoint);
- // Everything worked, let's remove a previous checkpoint if necessary.
- while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) {
- final CompletedCheckpoint completedCheckpoint = completedCheckpoints.removeFirst();
- tryRemoveCompletedCheckpoint(
- completedCheckpoint,
- completedCheckpoint.shouldBeDiscardedOnSubsume(),
- checkpointsCleaner,
- postCleanup);
- }
+ CheckpointSubsumeHelper.subsume(
+ completedCheckpoints,
+ maxNumberOfCheckpointsToRetain,
+ completedCheckpoint ->
+ tryRemoveCompletedCheckpoint(
+ completedCheckpoint,
+ completedCheckpoint.shouldBeDiscardedOnSubsume(),
+ checkpointsCleaner,
+ postCleanup));
LOG.debug("Added {} to {}.", checkpoint, path);
}
@@ -265,11 +265,15 @@ public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>>
LOG.info("Shutting down");
for (CompletedCheckpoint checkpoint : completedCheckpoints) {
- tryRemoveCompletedCheckpoint(
- checkpoint,
- checkpoint.shouldBeDiscardedOnShutdown(jobStatus),
- checkpointsCleaner,
- () -> {});
+ try {
+ tryRemoveCompletedCheckpoint(
+ checkpoint,
+ checkpoint.shouldBeDiscardedOnShutdown(jobStatus),
+ checkpointsCleaner,
+ () -> {});
+ } catch (Exception e) {
+ LOG.warn("Fail to remove checkpoint during shutdown.", e);
+ }
}
completedCheckpoints.clear();
@@ -292,14 +296,11 @@ public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>>
CompletedCheckpoint completedCheckpoint,
boolean shouldDiscard,
CheckpointsCleaner checkpointsCleaner,
- Runnable postCleanup) {
- try {
- if (tryRemove(completedCheckpoint.getCheckpointID())) {
- checkpointsCleaner.cleanCheckpoint(
- completedCheckpoint, shouldDiscard, postCleanup, ioExecutor);
- }
- } catch (Exception e) {
- LOG.warn("Failed to subsume the old checkpoint", e);
+ Runnable postCleanup)
+ throws Exception {
+ if (tryRemove(completedCheckpoint.getCheckpointID())) {
+ checkpointsCleaner.cleanCheckpoint(
+ completedCheckpoint, shouldDiscard, postCleanup, ioExecutor);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
index 174f0ea..d55857c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
@@ -62,9 +62,8 @@ public class EmbeddedCompletedCheckpointStore implements CompletedCheckpointStor
throws Exception {
checkpoints.addLast(checkpoint);
- if (checkpoints.size() > maxRetainedCheckpoints) {
- removeOldestCheckpoint();
- }
+ CheckpointSubsumeHelper.subsume(
+ checkpoints, maxRetainedCheckpoints, CompletedCheckpoint::discardOnSubsume);
}
@VisibleForTesting
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index 5863ec9..7723ff8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -70,14 +70,8 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt
checkpoints.addLast(checkpoint);
- if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
- try {
- CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst();
- checkpointToSubsume.discardOnSubsume();
- } catch (Exception e) {
- LOG.warn("Fail to subsume the old checkpoint.", e);
- }
- }
+ CheckpointSubsumeHelper.subsume(
+ checkpoints, maxNumberOfCheckpointsToRetain, CompletedCheckpoint::discardOnSubsume);
}
@Override