You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/02/28 07:40:48 UTC

[flink] branch master updated: [FLINK-26285] Fixes an inconsistency which was introduced by c3a6b51 as part of changes done for FLINK-19543

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 431f757a [FLINK-26285] Fixes an inconsistency which was introduced by c3a6b51 as part of changes done for FLINK-19543
431f757a is described below

commit 431f757aa3547833b3684f7e80dcc3ec8d2d8311
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Fri Feb 25 11:17:12 2022 +0100

    [FLINK-26285] Fixes an inconsistency which was introduced by c3a6b51 as part of changes done for FLINK-19543
    
    This issue never appeared because of the following reasons:
    - getAllAndLock is only called by the CompletedCheckpoint
      recovery which happens during the recovery of a job (i.e.
      after failover)
    - the removal happens through releaseAndTryRemove which can
      be called in the following places:
      - through DefaultCompletedCheckpointStore.addCheckpointAndSubsumeOldestOne
        That happens after a job is already recovered and running
      - through DefaultCompletedCheckpointStore.shutdown
        ...where it's only called if the job reached a
        globally-terminal state (i.e. it's not subject to
        job recovery)
      - through DefaultJobGraphStore.globalCleanupAsync
        ...which is also only called on jobs that reached a
        globally-terminal state (i.e. it's not subject to
        job recovery)
      - ZooKeeperStateHandleStore.releaseAndTryRemoveAll
        ...which seems to be legacy code which is not used
        anywhere in production, anymore. I'm leaving it here
        because it might make sense to remove ZooKeeper
        completely, anyway.
---
 .../zookeeper/ZooKeeperStateHandleStore.java       | 19 +++++--
 .../zookeeper/ZooKeeperStateHandleStoreTest.java   | 60 ++++++++++++++++++++++
 2 files changed, 74 insertions(+), 5 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index d0d00a0..a6ce7c5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.persistence.StateHandleStore;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.function.FunctionWithException;
 
 import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.shaded.curator5.org.apache.curator.utils.ZKPaths;
@@ -352,30 +353,38 @@ public class ZooKeeperStateHandleStore<T extends Serializable>
      */
     @Override
     public List<Tuple2<RetrievableStateHandle<T>, String>> getAllAndLock() throws Exception {
+        return getAllAndLock(parentNodePath -> client.getChildren().forPath(parentNodePath));
+    }
+
+    @VisibleForTesting
+    List<Tuple2<RetrievableStateHandle<T>, String>> getAllAndLock(
+            FunctionWithException<String, List<String>, Exception> getNodeChildren)
+            throws Exception {
         final List<Tuple2<RetrievableStateHandle<T>, String>> stateHandles = new ArrayList<>();
 
+        final String rootPath = "/";
         boolean success = false;
 
         retry:
         while (!success) {
             stateHandles.clear();
 
-            Stat stat = client.checkExists().forPath("/");
+            Stat stat = client.checkExists().forPath(rootPath);
             if (stat == null) {
                 break; // Node does not exist, done.
             } else {
                 // Initial cVersion (number of changes to the children of this node)
                 int initialCVersion = stat.getCversion();
 
-                List<String> children = client.getChildren().forPath("/");
+                final List<String> children = getNodeChildren.apply(rootPath);
 
                 for (String path : children) {
-                    path = "/" + path;
+                    path = rootPath + path;
 
                     try {
                         final RetrievableStateHandle<T> stateHandle = getAndLock(path);
                         stateHandles.add(new Tuple2<>(stateHandle, path));
-                    } catch (KeeperException.NoNodeException ignored) {
+                    } catch (NotExistException ignored) {
                         // Concurrent deletion, retry
                         continue retry;
                     } catch (IOException ioException) {
@@ -387,7 +396,7 @@ public class ZooKeeperStateHandleStore<T extends Serializable>
                     }
                 }
 
-                int finalCVersion = client.checkExists().forPath("/").getCversion();
+                int finalCVersion = client.checkExists().forPath(rootPath).getCversion();
 
                 // Check for concurrent modifications
                 success = initialCVersion == finalCVersion;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
index 986b26c..2f94827 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
 import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
 import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.data.Stat;
 
@@ -595,6 +596,65 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
         assertEquals(0, expected.size());
     }
 
+    @Test
+    public void testGetAllAndLockOnConcurrentDelete() throws Exception {
+        final TestingLongStateHandleHelper stateHandleProvider = new TestingLongStateHandleHelper();
+        final CuratorFramework client =
+                ZooKeeperUtils.useNamespaceAndEnsurePath(
+                        ZOOKEEPER.getClient(), "/testGetAllAndLockOnConcurrentDelete");
+
+        // this store simulates the ZooKeeper connection for maintaining the lifecycle (i.e.
+        // creating and deleting the nodes) of the StateHandles
+        final ZooKeeperStateHandleStore<TestingLongStateHandleHelper.LongStateHandle>
+                storeForCreationAndDeletion =
+                        new ZooKeeperStateHandleStore<>(client, stateHandleProvider);
+
+        // this store simulates a concurrent access to ZooKeeper
+        final ZooKeeperStateHandleStore<TestingLongStateHandleHelper.LongStateHandle>
+                storeForRetrieval = new ZooKeeperStateHandleStore<>(client, stateHandleProvider);
+
+        final String pathInZooKeeperPrefix = "/node";
+
+        final long stateForDeletion = 42L;
+        final String handlePathForDeletion = pathInZooKeeperPrefix + "-for-deletion";
+        storeForCreationAndDeletion.addAndLock(
+                handlePathForDeletion,
+                new TestingLongStateHandleHelper.LongStateHandle(stateForDeletion));
+
+        final long stateToKeep = stateForDeletion + 2;
+        storeForCreationAndDeletion.addAndLock(
+                pathInZooKeeperPrefix + "-keep",
+                new TestingLongStateHandleHelper.LongStateHandle(stateToKeep));
+
+        final List<
+                        Tuple2<
+                                RetrievableStateHandle<
+                                        TestingLongStateHandleHelper.LongStateHandle>,
+                                String>>
+                actuallyLockedHandles =
+                        storeForRetrieval.getAllAndLock(
+                                parentPath -> {
+                                    final List<String> childNodes =
+                                            client.getChildren().forPath(parentPath);
+                                    // the following block simulates the concurrent deletion of the
+                                    // child node after the node names are delivered to the
+                                    // storeForRetrieval causing a retry
+                                    if (storeForCreationAndDeletion
+                                            .exists(handlePathForDeletion)
+                                            .isExisting()) {
+                                        storeForCreationAndDeletion.releaseAndTryRemove(
+                                                handlePathForDeletion);
+                                    }
+
+                                    return childNodes;
+                                });
+
+        assertEquals(
+                "Only the StateHandle that was expected to be kept should be returned.",
+                stateToKeep,
+                Iterables.getOnlyElement(actuallyLockedHandles).f0.retrieveState().getValue());
+    }
+
     /** Tests that the state is returned sorted. */
     @Test
     public void testGetAllSortedByName() throws Exception {