You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/02/28 07:40:48 UTC
[flink] branch master updated: [FLINK-26285] Fixes an inconsistency which was introduced by c3a6b51 as part of changes done for FLINK-19543
This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 431f757a [FLINK-26285] Fixes an inconsistency which was introduced by c3a6b51 as part of changes done for FLINK-19543
431f757a is described below
commit 431f757aa3547833b3684f7e80dcc3ec8d2d8311
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Fri Feb 25 11:17:12 2022 +0100
[FLINK-26285] Fixes an inconsistency which was introduced by c3a6b51 as part of changes done for FLINK-19543
This issue never appeared because of the following reasons:
- getAllAndLock is only called by the CompletedCheckpoint
recovery which happens during the recovery of a job (i.e.
after failover)
- the removal happens through releaseAndTryRemove which can
be called in the following places:
- through DefaultCompletedCheckpointStore.addCheckpointAndSubsumeOldestOne
That happens after a job is already recovered and running
- through DefaultCompletedCheckpointStore.shutdown
...where it's only called if the job reached a
globally-terminal state (i.e. it's not subject to
job recovery)
- through DefaultJobGraphStore.globalCleanupAsync
...which is also only called on jobs that reached a
globally-terminal state (i.e. it's not subject to
job recovery)
- ZooKeeperStateHandleStore.releaseAndTryRemoveAll
...which seems to be legacy code which is not used
anywhere in production, anymore. I'm leaving it here
because it might make sense to remove ZooKeeper
completely, anyway.
---
.../zookeeper/ZooKeeperStateHandleStore.java | 19 +++++--
.../zookeeper/ZooKeeperStateHandleStoreTest.java | 60 ++++++++++++++++++++++
2 files changed, 74 insertions(+), 5 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index d0d00a0..a6ce7c5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper;
import org.apache.flink.runtime.persistence.StateHandleStore;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator5.org.apache.curator.utils.ZKPaths;
@@ -352,30 +353,38 @@ public class ZooKeeperStateHandleStore<T extends Serializable>
*/
@Override
public List<Tuple2<RetrievableStateHandle<T>, String>> getAllAndLock() throws Exception {
+ return getAllAndLock(parentNodePath -> client.getChildren().forPath(parentNodePath));
+ }
+
+ @VisibleForTesting
+ List<Tuple2<RetrievableStateHandle<T>, String>> getAllAndLock(
+ FunctionWithException<String, List<String>, Exception> getNodeChildren)
+ throws Exception {
final List<Tuple2<RetrievableStateHandle<T>, String>> stateHandles = new ArrayList<>();
+ final String rootPath = "/";
boolean success = false;
retry:
while (!success) {
stateHandles.clear();
- Stat stat = client.checkExists().forPath("/");
+ Stat stat = client.checkExists().forPath(rootPath);
if (stat == null) {
break; // Node does not exist, done.
} else {
// Initial cVersion (number of changes to the children of this node)
int initialCVersion = stat.getCversion();
- List<String> children = client.getChildren().forPath("/");
+ final List<String> children = getNodeChildren.apply(rootPath);
for (String path : children) {
- path = "/" + path;
+ path = rootPath + path;
try {
final RetrievableStateHandle<T> stateHandle = getAndLock(path);
stateHandles.add(new Tuple2<>(stateHandle, path));
- } catch (KeeperException.NoNodeException ignored) {
+ } catch (NotExistException ignored) {
// Concurrent deletion, retry
continue retry;
} catch (IOException ioException) {
@@ -387,7 +396,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable>
}
}
- int finalCVersion = client.checkExists().forPath("/").getCversion();
+ int finalCVersion = client.checkExists().forPath(rootPath).getCversion();
// Check for concurrent modifications
success = initialCVersion == finalCVersion;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
index 986b26c..2f94827 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.data.Stat;
@@ -595,6 +596,65 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
assertEquals(0, expected.size());
}
+ @Test
+ public void testGetAllAndLockOnConcurrentDelete() throws Exception {
+ final TestingLongStateHandleHelper stateHandleProvider = new TestingLongStateHandleHelper();
+ final CuratorFramework client =
+ ZooKeeperUtils.useNamespaceAndEnsurePath(
+ ZOOKEEPER.getClient(), "/testGetAllAndLockOnConcurrentDelete");
+
+ // this store simulates the ZooKeeper connection for maintaining the lifecycle (i.e.
+ // creating and deleting the nodes) of the StateHandles
+ final ZooKeeperStateHandleStore<TestingLongStateHandleHelper.LongStateHandle>
+ storeForCreationAndDeletion =
+ new ZooKeeperStateHandleStore<>(client, stateHandleProvider);
+
+ // this store simulates a concurrent access to ZooKeeper
+ final ZooKeeperStateHandleStore<TestingLongStateHandleHelper.LongStateHandle>
+ storeForRetrieval = new ZooKeeperStateHandleStore<>(client, stateHandleProvider);
+
+ final String pathInZooKeeperPrefix = "/node";
+
+ final long stateForDeletion = 42L;
+ final String handlePathForDeletion = pathInZooKeeperPrefix + "-for-deletion";
+ storeForCreationAndDeletion.addAndLock(
+ handlePathForDeletion,
+ new TestingLongStateHandleHelper.LongStateHandle(stateForDeletion));
+
+ final long stateToKeep = stateForDeletion + 2;
+ storeForCreationAndDeletion.addAndLock(
+ pathInZooKeeperPrefix + "-keep",
+ new TestingLongStateHandleHelper.LongStateHandle(stateToKeep));
+
+ final List<
+ Tuple2<
+ RetrievableStateHandle<
+ TestingLongStateHandleHelper.LongStateHandle>,
+ String>>
+ actuallyLockedHandles =
+ storeForRetrieval.getAllAndLock(
+ parentPath -> {
+ final List<String> childNodes =
+ client.getChildren().forPath(parentPath);
+ // the following block simulates the concurrent deletion of the
+ // child node after the node names are delivered to the
+ // storeForRetrieval causing a retry
+ if (storeForCreationAndDeletion
+ .exists(handlePathForDeletion)
+ .isExisting()) {
+ storeForCreationAndDeletion.releaseAndTryRemove(
+ handlePathForDeletion);
+ }
+
+ return childNodes;
+ });
+
+ assertEquals(
+ "Only the StateHandle that was expected to be kept should be returned.",
+ stateToKeep,
+ Iterables.getOnlyElement(actuallyLockedHandles).f0.retrieveState().getValue());
+ }
+
/** Tests that the state is returned sorted. */
@Test
public void testGetAllSortedByName() throws Exception {