You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/10/29 13:50:48 UTC

[GitHub] [flink] dmvk opened a new pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

dmvk opened a new pull request #17607:
URL: https://github.com/apache/flink/pull/17607


   This PR introduces idempotent transactions in `ZooKeeperStateHandleStore#addAndLock` method. In case of unstable connection we may actually retry transactions that have already succeeded and therefore encounter NodeAlreadyExists exception.
   
   https://issues.apache.org/jira/browse/FLINK-24543


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dmvk commented on a change in pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #17607:
URL: https://github.com/apache/flink/pull/17607#discussion_r755095766



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -87,18 +88,19 @@
 
     private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperStateHandleStore.class);
 
-    @VisibleForTesting
-    static final Set<Class<? extends KeeperException>> PRE_COMMIT_EXCEPTIONS =
-            newHashSet(
-                    KeeperException.NodeExistsException.class,
-                    KeeperException.BadArgumentsException.class,
-                    KeeperException.NoNodeException.class,
-                    KeeperException.NoAuthException.class,
-                    KeeperException.BadVersionException.class,
-                    KeeperException.AuthFailedException.class,
-                    KeeperException.InvalidACLException.class,
-                    KeeperException.SessionMovedException.class,
-                    KeeperException.NotReadOnlyException.class);
+    /** Pre-commit exceptions that don't imply data inconsistency. */
+    private static final Set<Class<? extends KeeperException>> SAFE_PRE_COMMIT_EXCEPTIONS =
+            new HashSet<>(
+                    Arrays.asList(
+                            KeeperException.NodeExistsException.class,
+                            KeeperException.BadArgumentsException.class,
+                            KeeperException.NoNodeException.class,
+                            KeeperException.NoAuthException.class,
+                            KeeperException.BadVersionException.class,
+                            KeeperException.AuthFailedException.class,
+                            KeeperException.InvalidACLException.class,
+                            KeeperException.SessionMovedException.class,
+                            KeeperException.NotReadOnlyException.class));

Review comment:
       I'd say in very rare conditions it could happen (if connection loss happens right before retries are exhausted). We default to 3 retries with exponential backoff.
   
   We could also argue that this could happen `AuthFailed` / `InvalidACLException` when there is some manual operation on the broker side. Eg. sys-admin changes the ACL
   
   Description of `SessionMovedException`
   ```
   SESSIONMOVED
   Session moved to another server, so operation is ignored
   ```
   
   From `RetryLoop`:
   ```
   public static boolean shouldRetry(int rc) {
       return rc == Code.CONNECTIONLOSS.intValue() || rc == Code.OPERATIONTIMEOUT.intValue() || rc == Code.SESSIONMOVED.intValue() || rc == Code.SESSIONEXPIRED.intValue() || rc == -13;
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17607:
URL: https://github.com/apache/flink/pull/17607#issuecomment-954762085


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25646",
       "triggerID" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25790",
       "triggerID" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3aaabcc0d323dc042455d69a707bfccbb3d517b4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25955",
       "triggerID" : "3aaabcc0d323dc042455d69a707bfccbb3d517b4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "eebbec021e44ecfa2b3ef49c6a0265abce126bbc",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26217",
       "triggerID" : "eebbec021e44ecfa2b3ef49c6a0265abce126bbc",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b8fc097e73709a1f449b66ee31d744caa2ebb8e7",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27123",
       "triggerID" : "b8fc097e73709a1f449b66ee31d744caa2ebb8e7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * eebbec021e44ecfa2b3ef49c6a0265abce126bbc Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26217) 
   * b8fc097e73709a1f449b66ee31d744caa2ebb8e7 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27123) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dmvk commented on pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
dmvk commented on pull request #17607:
URL: https://github.com/apache/flink/pull/17607#issuecomment-964100082


   @tillrohrmann Thanks for the review! I've over-complicated the initial change-set. I think it's way better now 👍
   
   Ready for another pass. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dmvk commented on a change in pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #17607:
URL: https://github.com/apache/flink/pull/17607#discussion_r755118309



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -87,18 +88,19 @@
 
     private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperStateHandleStore.class);
 
-    @VisibleForTesting
-    static final Set<Class<? extends KeeperException>> PRE_COMMIT_EXCEPTIONS =
-            newHashSet(
-                    KeeperException.NodeExistsException.class,
-                    KeeperException.BadArgumentsException.class,
-                    KeeperException.NoNodeException.class,
-                    KeeperException.NoAuthException.class,
-                    KeeperException.BadVersionException.class,
-                    KeeperException.AuthFailedException.class,
-                    KeeperException.InvalidACLException.class,
-                    KeeperException.SessionMovedException.class,
-                    KeeperException.NotReadOnlyException.class);
+    /** Pre-commit exceptions that don't imply data inconsistency. */
+    private static final Set<Class<? extends KeeperException>> SAFE_PRE_COMMIT_EXCEPTIONS =
+            new HashSet<>(
+                    Arrays.asList(
+                            KeeperException.NodeExistsException.class,
+                            KeeperException.BadArgumentsException.class,
+                            KeeperException.NoNodeException.class,
+                            KeeperException.NoAuthException.class,
+                            KeeperException.BadVersionException.class,
+                            KeeperException.AuthFailedException.class,
+                            KeeperException.InvalidACLException.class,
+                            KeeperException.SessionMovedException.class,
+                            KeeperException.NotReadOnlyException.class));

Review comment:
       I'm just desperate here... I'd say except `NoNodeException` and `BadArgumentsException` all of them are "potentially" unsafe if preceded by connection loss.
   
   I'd be in favor of removing this list completely as it's really hard to reason about.
   
   I'd even suggest reverting https://issues.apache.org/jira/browse/FLINK-22494 and coming up with a different solution. (discarding checkpoint & validating metadata before recovery)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #17607:
URL: https://github.com/apache/flink/pull/17607#issuecomment-954762085


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7ccc4b0240e733447c42bbfa48911f768afac57d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17607:
URL: https://github.com/apache/flink/pull/17607#issuecomment-954762085


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25646",
       "triggerID" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7ccc4b0240e733447c42bbfa48911f768afac57d Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25646) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dmvk commented on a change in pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #17607:
URL: https://github.com/apache/flink/pull/17607#discussion_r743562938



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));

Review comment:
       It could indeed result in the `NodeExistsException`. However this wouldn't introduce inconsistencies, because we don't discard the underlying state.
   
   On the other hand, this would trigger the "unexpected" catch block bellow ... 🤔 

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));

Review comment:
       We can simply get rid of this write lock. It was meant just a safeguard, but it seems to introduce more corner cases. If we trust the leader election mechanism here, it's not needed. I'll remove it.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));

Review comment:
       We can simply get rid of this write lock. It was meant as a safeguard, but it seems to introduce more corner cases. If we trust the leader election mechanism here, it's not needed. I'll remove it.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));
+            } catch (KeeperException.NodeExistsException e) {
+                // There should always be a single JobMaster for each job, so we should never
+                // encounter this.
+                throw new IllegalStateException(
+                        "Write lock is already taken, which signals there are multiple processes writing to the same path. This should never happen.",
+                        e);
+            }
+            if (exists(path).isExisting()) {
+                throw new AlreadyExistException(
+                        String.format("ZooKeeper node %s already exists.", path));
+            }
+            return doAddAndLock(path, state);
+        } finally {
+            // Release the write-lock.
+            deleteIfExists(getWriteLockPath(path));
+        }
+    }
 
-        RetrievableStateHandle<T> storeHandle = storage.store(state);
-
-        byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
-
+    private RetrievableStateHandle<T> doAddAndLock(String path, T state) throws Exception {
+        final RetrievableStateHandle<T> storeHandle = storage.store(state);
+        final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
         try {
             writeStoreHandleTransactionally(path, serializedStoreHandle);
             return storeHandle;
         } catch (Exception e) {
             if (indicatesPossiblyInconsistentState(e)) {
                 throw new PossibleInconsistentStateException(e);
             }
-
-            // in any other failure case: discard the state
+            // In case of any other failure, discard the state and rethrow the exception.
             storeHandle.discardState();
-
-            // We wrap the exception here so that it could be caught in DefaultJobGraphStore
-            throw ExceptionUtils.findThrowable(e, KeeperException.NodeExistsException.class)
-                    .map(
-                            nee ->
-                                    new AlreadyExistException(
-                                            "ZooKeeper node " + path + " already exists.", nee))
-                    .orElseThrow(() -> e);
+            throw e;
         }
     }
 
     // this method is provided for the sole purpose of easier testing
     @VisibleForTesting
     protected void writeStoreHandleTransactionally(String path, byte[] serializedStoreHandle)
             throws Exception {
-        // Write state handle (not the actual state) to ZooKeeper. This is expected to be
-        // smaller than the state itself. This level of indirection makes sure that data in
-        // ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but
-        // the state can be larger.
-        // Create the lock node in a transaction with the actual state node. That way we can
+        // Write state handle (not the actual state) to ZooKeeper. This is expected to be smaller
+        // than the state itself. This level of indirection makes sure that data in ZooKeeper is
+        // small, because ZooKeeper is designed for data in the KB range, but the state can be
+        // larger. Create the lock node in a transaction with the actual state node. That way we can
         // prevent race conditions with a concurrent delete operation.
-        client.inTransaction()
-                .create()
-                .withMode(CreateMode.PERSISTENT)
-                .forPath(path, serializedStoreHandle)
-                .and()
-                .create()
-                .withMode(CreateMode.EPHEMERAL)
-                .forPath(getLockPath(path))
-                .and()
-                .commit();
+        while (true) {
+            try {
+                client.inTransaction()
+                        .create()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path, serializedStoreHandle)
+                        .and()
+                        .create()
+                        .withMode(CreateMode.EPHEMERAL)
+                        .forPath(getLockPath(path))
+                        .and()
+                        .commit();
+                break;
+            } catch (KeeperException.NodeExistsException e) {

Review comment:
       We now check whether node is empty before executing `writeStoreHandleTransactionally()`, so this is hopefully covered by that. We can IMO trust the transaction mechanism here (assuming there are no concurrent modifications to the znode).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dmvk commented on a change in pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #17607:
URL: https://github.com/apache/flink/pull/17607#discussion_r743563968



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));

Review comment:
       We can simply get rid of this write lock. It was meant just a safeguard, but it seems to introduce more corner cases. If we trust the leader election mechanism here, it's not needed. I'll remove it.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));

Review comment:
       We can simply get rid of this write lock. It was meant as a safeguard, but it seems to introduce more corner cases. If we trust the leader election mechanism here, it's not needed. I'll remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #17607:
URL: https://github.com/apache/flink/pull/17607#discussion_r756909528



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -87,18 +88,19 @@
 
     private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperStateHandleStore.class);
 
-    @VisibleForTesting
-    static final Set<Class<? extends KeeperException>> PRE_COMMIT_EXCEPTIONS =
-            newHashSet(
-                    KeeperException.NodeExistsException.class,
-                    KeeperException.BadArgumentsException.class,
-                    KeeperException.NoNodeException.class,
-                    KeeperException.NoAuthException.class,
-                    KeeperException.BadVersionException.class,
-                    KeeperException.AuthFailedException.class,
-                    KeeperException.InvalidACLException.class,
-                    KeeperException.SessionMovedException.class,
-                    KeeperException.NotReadOnlyException.class);
+    /** Pre-commit exceptions that don't imply data inconsistency. */
+    private static final Set<Class<? extends KeeperException>> SAFE_PRE_COMMIT_EXCEPTIONS =
+            new HashSet<>(
+                    Arrays.asList(
+                            KeeperException.NodeExistsException.class,
+                            KeeperException.BadArgumentsException.class,
+                            KeeperException.NoNodeException.class,
+                            KeeperException.NoAuthException.class,
+                            KeeperException.BadVersionException.class,
+                            KeeperException.AuthFailedException.class,
+                            KeeperException.InvalidACLException.class,
+                            KeeperException.SessionMovedException.class,
+                            KeeperException.NotReadOnlyException.class));

Review comment:
       Agreed.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -87,18 +88,19 @@
 
     private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperStateHandleStore.class);
 
-    @VisibleForTesting
-    static final Set<Class<? extends KeeperException>> PRE_COMMIT_EXCEPTIONS =
-            newHashSet(
-                    KeeperException.NodeExistsException.class,
-                    KeeperException.BadArgumentsException.class,
-                    KeeperException.NoNodeException.class,
-                    KeeperException.NoAuthException.class,
-                    KeeperException.BadVersionException.class,
-                    KeeperException.AuthFailedException.class,
-                    KeeperException.InvalidACLException.class,
-                    KeeperException.SessionMovedException.class,
-                    KeeperException.NotReadOnlyException.class);
+    /** Pre-commit exceptions that don't imply data inconsistency. */
+    private static final Set<Class<? extends KeeperException>> SAFE_PRE_COMMIT_EXCEPTIONS =
+            new HashSet<>(
+                    Arrays.asList(
+                            KeeperException.NodeExistsException.class,

Review comment:
       Ok, let's tackle this with the follow-up issue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dmvk commented on a change in pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #17607:
URL: https://github.com/apache/flink/pull/17607#discussion_r743565748



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));
+            } catch (KeeperException.NodeExistsException e) {
+                // There should always be a single JobMaster for each job, so we should never
+                // encounter this.
+                throw new IllegalStateException(
+                        "Write lock is already taken, which signals there are multiple processes writing to the same path. This should never happen.",
+                        e);
+            }
+            if (exists(path).isExisting()) {
+                throw new AlreadyExistException(
+                        String.format("ZooKeeper node %s already exists.", path));
+            }
+            return doAddAndLock(path, state);
+        } finally {
+            // Release the write-lock.
+            deleteIfExists(getWriteLockPath(path));
+        }
+    }
 
-        RetrievableStateHandle<T> storeHandle = storage.store(state);
-
-        byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
-
+    private RetrievableStateHandle<T> doAddAndLock(String path, T state) throws Exception {
+        final RetrievableStateHandle<T> storeHandle = storage.store(state);
+        final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
         try {
             writeStoreHandleTransactionally(path, serializedStoreHandle);
             return storeHandle;
         } catch (Exception e) {
             if (indicatesPossiblyInconsistentState(e)) {
                 throw new PossibleInconsistentStateException(e);
             }
-
-            // in any other failure case: discard the state
+            // In case of any other failure, discard the state and rethrow the exception.
             storeHandle.discardState();
-
-            // We wrap the exception here so that it could be caught in DefaultJobGraphStore
-            throw ExceptionUtils.findThrowable(e, KeeperException.NodeExistsException.class)
-                    .map(
-                            nee ->
-                                    new AlreadyExistException(
-                                            "ZooKeeper node " + path + " already exists.", nee))
-                    .orElseThrow(() -> e);
+            throw e;
         }
     }
 
     // this method is provided for the sole purpose of easier testing
     @VisibleForTesting
     protected void writeStoreHandleTransactionally(String path, byte[] serializedStoreHandle)
             throws Exception {
-        // Write state handle (not the actual state) to ZooKeeper. This is expected to be
-        // smaller than the state itself. This level of indirection makes sure that data in
-        // ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but
-        // the state can be larger.
-        // Create the lock node in a transaction with the actual state node. That way we can
+        // Write state handle (not the actual state) to ZooKeeper. This is expected to be smaller
+        // than the state itself. This level of indirection makes sure that data in ZooKeeper is
+        // small, because ZooKeeper is designed for data in the KB range, but the state can be
+        // larger. Create the lock node in a transaction with the actual state node. That way we can
         // prevent race conditions with a concurrent delete operation.
-        client.inTransaction()
-                .create()
-                .withMode(CreateMode.PERSISTENT)
-                .forPath(path, serializedStoreHandle)
-                .and()
-                .create()
-                .withMode(CreateMode.EPHEMERAL)
-                .forPath(getLockPath(path))
-                .and()
-                .commit();
+        while (true) {
+            try {
+                client.inTransaction()
+                        .create()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path, serializedStoreHandle)
+                        .and()
+                        .create()
+                        .withMode(CreateMode.EPHEMERAL)
+                        .forPath(getLockPath(path))
+                        .and()
+                        .commit();
+                break;
+            } catch (KeeperException.NodeExistsException e) {

Review comment:
       We now check whether node is empty before executing `writeStoreHandleTransactionally()`, so this is hopefully covered by that. We can IMO trust the transaction mechanism here (assuming there are no concurrent modifications to the znode).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17607:
URL: https://github.com/apache/flink/pull/17607#issuecomment-954762085


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25646",
       "triggerID" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25790",
       "triggerID" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3aaabcc0d323dc042455d69a707bfccbb3d517b4",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25955",
       "triggerID" : "3aaabcc0d323dc042455d69a707bfccbb3d517b4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "eebbec021e44ecfa2b3ef49c6a0265abce126bbc",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26217",
       "triggerID" : "eebbec021e44ecfa2b3ef49c6a0265abce126bbc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3aaabcc0d323dc042455d69a707bfccbb3d517b4 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25955) 
   * eebbec021e44ecfa2b3ef49c6a0265abce126bbc Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26217) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17607:
URL: https://github.com/apache/flink/pull/17607#issuecomment-954762085


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25646",
       "triggerID" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25790",
       "triggerID" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3aaabcc0d323dc042455d69a707bfccbb3d517b4",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25955",
       "triggerID" : "3aaabcc0d323dc042455d69a707bfccbb3d517b4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3aaabcc0d323dc042455d69a707bfccbb3d517b4 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25955) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17607:
URL: https://github.com/apache/flink/pull/17607#issuecomment-954762085


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25646",
       "triggerID" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25790",
       "triggerID" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3aaabcc0d323dc042455d69a707bfccbb3d517b4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25955",
       "triggerID" : "3aaabcc0d323dc042455d69a707bfccbb3d517b4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "eebbec021e44ecfa2b3ef49c6a0265abce126bbc",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26217",
       "triggerID" : "eebbec021e44ecfa2b3ef49c6a0265abce126bbc",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b8fc097e73709a1f449b66ee31d744caa2ebb8e7",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "b8fc097e73709a1f449b66ee31d744caa2ebb8e7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * eebbec021e44ecfa2b3ef49c6a0265abce126bbc Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26217) 
   * b8fc097e73709a1f449b66ee31d744caa2ebb8e7 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17607:
URL: https://github.com/apache/flink/pull/17607#issuecomment-954762085


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25646",
       "triggerID" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25790",
       "triggerID" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3aaabcc0d323dc042455d69a707bfccbb3d517b4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25955",
       "triggerID" : "3aaabcc0d323dc042455d69a707bfccbb3d517b4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "eebbec021e44ecfa2b3ef49c6a0265abce126bbc",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26217",
       "triggerID" : "eebbec021e44ecfa2b3ef49c6a0265abce126bbc",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b8fc097e73709a1f449b66ee31d744caa2ebb8e7",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27123",
       "triggerID" : "b8fc097e73709a1f449b66ee31d744caa2ebb8e7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b8fc097e73709a1f449b66ee31d744caa2ebb8e7 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27123) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17607:
URL: https://github.com/apache/flink/pull/17607#issuecomment-954762085






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17607:
URL: https://github.com/apache/flink/pull/17607#issuecomment-954762085


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25646",
       "triggerID" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25790",
       "triggerID" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7ccc4b0240e733447c42bbfa48911f768afac57d Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25646) 
   * 497cb1c9352a1097ca8ee32d2bf68afba094e263 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25790) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17607:
URL: https://github.com/apache/flink/pull/17607#issuecomment-954762085


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25646",
       "triggerID" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25790",
       "triggerID" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3aaabcc0d323dc042455d69a707bfccbb3d517b4",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25955",
       "triggerID" : "3aaabcc0d323dc042455d69a707bfccbb3d517b4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 497cb1c9352a1097ca8ee32d2bf68afba094e263 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25790) 
   * 3aaabcc0d323dc042455d69a707bfccbb3d517b4 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25955) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #17607:
URL: https://github.com/apache/flink/pull/17607#discussion_r743521497



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));
+            } catch (KeeperException.NodeExistsException e) {
+                // There should always be a single JobMaster for each job, so we should never
+                // encounter this.
+                throw new IllegalStateException(
+                        "Write lock is already taken, which signals there are multiple processes writing to the same path. This should never happen.",
+                        e);
+            }
+            if (exists(path).isExisting()) {
+                throw new AlreadyExistException(
+                        String.format("ZooKeeper node %s already exists.", path));
+            }
+            return doAddAndLock(path, state);
+        } finally {
+            // Release the write-lock.
+            deleteIfExists(getWriteLockPath(path));
+        }
+    }
 
-        RetrievableStateHandle<T> storeHandle = storage.store(state);
-
-        byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
-
+    private RetrievableStateHandle<T> doAddAndLock(String path, T state) throws Exception {
+        final RetrievableStateHandle<T> storeHandle = storage.store(state);
+        final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
         try {
             writeStoreHandleTransactionally(path, serializedStoreHandle);
             return storeHandle;
         } catch (Exception e) {
             if (indicatesPossiblyInconsistentState(e)) {
                 throw new PossibleInconsistentStateException(e);
             }
-
-            // in any other failure case: discard the state
+            // In case of any other failure, discard the state and rethrow the exception.
             storeHandle.discardState();
-
-            // We wrap the exception here so that it could be caught in DefaultJobGraphStore
-            throw ExceptionUtils.findThrowable(e, KeeperException.NodeExistsException.class)
-                    .map(
-                            nee ->
-                                    new AlreadyExistException(
-                                            "ZooKeeper node " + path + " already exists.", nee))
-                    .orElseThrow(() -> e);
+            throw e;
         }
     }
 
     // this method is provided for the sole purpose of easier testing
     @VisibleForTesting
     protected void writeStoreHandleTransactionally(String path, byte[] serializedStoreHandle)
             throws Exception {
-        // Write state handle (not the actual state) to ZooKeeper. This is expected to be
-        // smaller than the state itself. This level of indirection makes sure that data in
-        // ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but
-        // the state can be larger.
-        // Create the lock node in a transaction with the actual state node. That way we can
+        // Write state handle (not the actual state) to ZooKeeper. This is expected to be smaller
+        // than the state itself. This level of indirection makes sure that data in ZooKeeper is
+        // small, because ZooKeeper is designed for data in the KB range, but the state can be
+        // larger. Create the lock node in a transaction with the actual state node. That way we can
         // prevent race conditions with a concurrent delete operation.
-        client.inTransaction()
-                .create()
-                .withMode(CreateMode.PERSISTENT)
-                .forPath(path, serializedStoreHandle)
-                .and()
-                .create()
-                .withMode(CreateMode.EPHEMERAL)
-                .forPath(getLockPath(path))
-                .and()
-                .commit();
+        while (true) {
+            try {
+                client.inTransaction()
+                        .create()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path, serializedStoreHandle)
+                        .and()
+                        .create()
+                        .withMode(CreateMode.EPHEMERAL)
+                        .forPath(getLockPath(path))
+                        .and()
+                        .commit();
+                break;
+            } catch (KeeperException.NodeExistsException e) {
+                // Transactions are not idempotent in the curator version we're currently using, so
+                // it is actually possible that we've re-tried a transaction that has already
+                // succeeded. We've ensured that the node hasn't been present prior executing the
+                // transaction, so we're pretty confident that this is a result of the retry
+                // mechanism.

Review comment:
       If we catch a `NodeExistsException`, then we will retry the loop. Won't this cause another `NodeExistsException`?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));
+            } catch (KeeperException.NodeExistsException e) {
+                // There should always be a single JobMaster for each job, so we should never
+                // encounter this.
+                throw new IllegalStateException(
+                        "Write lock is already taken, which signals there are multiple processes writing to the same path. This should never happen.",
+                        e);
+            }
+            if (exists(path).isExisting()) {
+                throw new AlreadyExistException(
+                        String.format("ZooKeeper node %s already exists.", path));
+            }
+            return doAddAndLock(path, state);
+        } finally {
+            // Release the write-lock.
+            deleteIfExists(getWriteLockPath(path));
+        }
+    }
 
-        RetrievableStateHandle<T> storeHandle = storage.store(state);
-
-        byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
-
+    private RetrievableStateHandle<T> doAddAndLock(String path, T state) throws Exception {
+        final RetrievableStateHandle<T> storeHandle = storage.store(state);
+        final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
         try {
             writeStoreHandleTransactionally(path, serializedStoreHandle);
             return storeHandle;
         } catch (Exception e) {
             if (indicatesPossiblyInconsistentState(e)) {
                 throw new PossibleInconsistentStateException(e);
             }
-
-            // in any other failure case: discard the state
+            // In case of any other failure, discard the state and rethrow the exception.
             storeHandle.discardState();
-
-            // We wrap the exception here so that it could be caught in DefaultJobGraphStore
-            throw ExceptionUtils.findThrowable(e, KeeperException.NodeExistsException.class)
-                    .map(
-                            nee ->
-                                    new AlreadyExistException(
-                                            "ZooKeeper node " + path + " already exists.", nee))
-                    .orElseThrow(() -> e);
+            throw e;
         }
     }
 
     // this method is provided for the sole purpose of easier testing
     @VisibleForTesting
     protected void writeStoreHandleTransactionally(String path, byte[] serializedStoreHandle)
             throws Exception {
-        // Write state handle (not the actual state) to ZooKeeper. This is expected to be
-        // smaller than the state itself. This level of indirection makes sure that data in
-        // ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but
-        // the state can be larger.
-        // Create the lock node in a transaction with the actual state node. That way we can
+        // Write state handle (not the actual state) to ZooKeeper. This is expected to be smaller
+        // than the state itself. This level of indirection makes sure that data in ZooKeeper is
+        // small, because ZooKeeper is designed for data in the KB range, but the state can be
+        // larger. Create the lock node in a transaction with the actual state node. That way we can
         // prevent race conditions with a concurrent delete operation.
-        client.inTransaction()
-                .create()
-                .withMode(CreateMode.PERSISTENT)
-                .forPath(path, serializedStoreHandle)
-                .and()
-                .create()
-                .withMode(CreateMode.EPHEMERAL)
-                .forPath(getLockPath(path))
-                .and()
-                .commit();
+        while (true) {
+            try {
+                client.inTransaction()
+                        .create()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path, serializedStoreHandle)
+                        .and()
+                        .create()
+                        .withMode(CreateMode.EPHEMERAL)
+                        .forPath(getLockPath(path))
+                        .and()
+                        .commit();
+                break;
+            } catch (KeeperException.NodeExistsException e) {

Review comment:
       We could also read the contents of `path` to see whether we have written the `serializedStoreHandle`. If this is the case, then the call succeeded and otherwise we can throw the `NodeExistsException`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));

Review comment:
       Won't this call have the same problem we try to solve with this PR? Differently asked, won't we have a problem if this call retries and the first call already succeeded?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));
+            } catch (KeeperException.NodeExistsException e) {
+                // There should always be a single JobMaster for each job, so we should never
+                // encounter this.
+                throw new IllegalStateException(
+                        "Write lock is already taken, which signals there are multiple processes writing to the same path. This should never happen.",
+                        e);
+            }
+            if (exists(path).isExisting()) {
+                throw new AlreadyExistException(
+                        String.format("ZooKeeper node %s already exists.", path));
+            }
+            return doAddAndLock(path, state);
+        } finally {
+            // Release the write-lock.
+            deleteIfExists(getWriteLockPath(path));
+        }
+    }
 
-        RetrievableStateHandle<T> storeHandle = storage.store(state);
-
-        byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
-
+    private RetrievableStateHandle<T> doAddAndLock(String path, T state) throws Exception {
+        final RetrievableStateHandle<T> storeHandle = storage.store(state);
+        final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
         try {
             writeStoreHandleTransactionally(path, serializedStoreHandle);
             return storeHandle;
         } catch (Exception e) {
             if (indicatesPossiblyInconsistentState(e)) {
                 throw new PossibleInconsistentStateException(e);
             }
-
-            // in any other failure case: discard the state
+            // In case of any other failure, discard the state and rethrow the exception.
             storeHandle.discardState();
-
-            // We wrap the exception here so that it could be caught in DefaultJobGraphStore
-            throw ExceptionUtils.findThrowable(e, KeeperException.NodeExistsException.class)
-                    .map(
-                            nee ->
-                                    new AlreadyExistException(
-                                            "ZooKeeper node " + path + " already exists.", nee))
-                    .orElseThrow(() -> e);
+            throw e;
         }
     }
 
     // this method is provided for the sole purpose of easier testing
     @VisibleForTesting
     protected void writeStoreHandleTransactionally(String path, byte[] serializedStoreHandle)
             throws Exception {
-        // Write state handle (not the actual state) to ZooKeeper. This is expected to be
-        // smaller than the state itself. This level of indirection makes sure that data in
-        // ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but
-        // the state can be larger.
-        // Create the lock node in a transaction with the actual state node. That way we can
+        // Write state handle (not the actual state) to ZooKeeper. This is expected to be smaller
+        // than the state itself. This level of indirection makes sure that data in ZooKeeper is
+        // small, because ZooKeeper is designed for data in the KB range, but the state can be
+        // larger. Create the lock node in a transaction with the actual state node. That way we can
         // prevent race conditions with a concurrent delete operation.
-        client.inTransaction()
-                .create()
-                .withMode(CreateMode.PERSISTENT)
-                .forPath(path, serializedStoreHandle)
-                .and()
-                .create()
-                .withMode(CreateMode.EPHEMERAL)
-                .forPath(getLockPath(path))
-                .and()
-                .commit();
+        while (true) {
+            try {
+                client.inTransaction()
+                        .create()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path, serializedStoreHandle)
+                        .and()
+                        .create()
+                        .withMode(CreateMode.EPHEMERAL)
+                        .forPath(getLockPath(path))
+                        .and()
+                        .commit();
+                break;
+            } catch (KeeperException.NodeExistsException e) {
+                // Transactions are not idempotent in the curator version we're currently using, so
+                // it is actually possible that we've re-tried a transaction that has already
+                // succeeded. We've ensured that the node hasn't been present prior executing the
+                // transaction, so we're pretty confident that this is a result of the retry
+                // mechanism.

Review comment:
       Maybe adding a test for this method could be helpful.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dmvk commented on a change in pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #17607:
URL: https://github.com/apache/flink/pull/17607#discussion_r755079258



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,43 +156,40 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
-
-        RetrievableStateHandle<T> storeHandle = storage.store(state);
-
-        byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
-
+        if (exists(path).isExisting()) {
+            throw new AlreadyExistException(
+                    String.format("ZooKeeper node %s already exists.", path));
+        }
+        final RetrievableStateHandle<T> storeHandle = storage.store(state);
+        final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
         try {
             writeStoreHandleTransactionally(path, serializedStoreHandle);
             return storeHandle;
+        } catch (KeeperException.NodeExistsException e) {
+            // Transactions are not idempotent in the curator version we're currently using, so it
+            // is actually possible that we've re-tried a transaction that has already succeeded.
+            // We've ensured that the node hasn't been present prior executing the transaction, so
+            // we can assume that this is a result of the retry mechanism.
+            return storeHandle;

Review comment:
       I don't think so, replace actually assumes that node is already present. It simply updates the data in an existing znode, which should be idempotent if retried.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17607:
URL: https://github.com/apache/flink/pull/17607#issuecomment-954762085


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25646",
       "triggerID" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25790",
       "triggerID" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3aaabcc0d323dc042455d69a707bfccbb3d517b4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25955",
       "triggerID" : "3aaabcc0d323dc042455d69a707bfccbb3d517b4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "eebbec021e44ecfa2b3ef49c6a0265abce126bbc",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26217",
       "triggerID" : "eebbec021e44ecfa2b3ef49c6a0265abce126bbc",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b8fc097e73709a1f449b66ee31d744caa2ebb8e7",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27123",
       "triggerID" : "b8fc097e73709a1f449b66ee31d744caa2ebb8e7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d7ad08ea24cb632d0d3db6dc942bb052ab9b464",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27143",
       "triggerID" : "3d7ad08ea24cb632d0d3db6dc942bb052ab9b464",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b8fc097e73709a1f449b66ee31d744caa2ebb8e7 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27123) 
   * 3d7ad08ea24cb632d0d3db6dc942bb052ab9b464 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27143) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17607:
URL: https://github.com/apache/flink/pull/17607#issuecomment-954762085


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25646",
       "triggerID" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25790",
       "triggerID" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3aaabcc0d323dc042455d69a707bfccbb3d517b4",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25955",
       "triggerID" : "3aaabcc0d323dc042455d69a707bfccbb3d517b4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3aaabcc0d323dc042455d69a707bfccbb3d517b4 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25955) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17607:
URL: https://github.com/apache/flink/pull/17607#issuecomment-954762085


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25646",
       "triggerID" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25790",
       "triggerID" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3aaabcc0d323dc042455d69a707bfccbb3d517b4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3aaabcc0d323dc042455d69a707bfccbb3d517b4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 497cb1c9352a1097ca8ee32d2bf68afba094e263 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25790) 
   * 3aaabcc0d323dc042455d69a707bfccbb3d517b4 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17607:
URL: https://github.com/apache/flink/pull/17607#issuecomment-954762085


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25646",
       "triggerID" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7ccc4b0240e733447c42bbfa48911f768afac57d Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25646) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dmvk commented on a change in pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #17607:
URL: https://github.com/apache/flink/pull/17607#discussion_r743562938



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));

Review comment:
       It could indeed result in the `NodeExistsException`. However this wouldn't introduce inconsistencies, because we don't discard the underlying state.
   
   On the other hand, this would trigger the "unexpected" catch block bellow ... 🤔 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dmvk commented on a change in pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #17607:
URL: https://github.com/apache/flink/pull/17607#discussion_r743562938



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));

Review comment:
       It could indeed result in the `NodeExistsException`. However this wouldn't introduce inconsistencies, because we don't discard the underlying state.
   
   On the other hand, this would trigger the "unexpected" catch block bellow ... 🤔 

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));

Review comment:
       We can simply get rid of this write lock. It was meant just a safeguard, but it seems to introduce more corner cases. If we trust the leader election mechanism here, it's not needed. I'll remove it.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));

Review comment:
       We can simply get rid of this write lock. It was meant as a safeguard, but it seems to introduce more corner cases. If we trust the leader election mechanism here, it's not needed. I'll remove it.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));
+            } catch (KeeperException.NodeExistsException e) {
+                // There should always be a single JobMaster for each job, so we should never
+                // encounter this.
+                throw new IllegalStateException(
+                        "Write lock is already taken, which signals there are multiple processes writing to the same path. This should never happen.",
+                        e);
+            }
+            if (exists(path).isExisting()) {
+                throw new AlreadyExistException(
+                        String.format("ZooKeeper node %s already exists.", path));
+            }
+            return doAddAndLock(path, state);
+        } finally {
+            // Release the write-lock.
+            deleteIfExists(getWriteLockPath(path));
+        }
+    }
 
-        RetrievableStateHandle<T> storeHandle = storage.store(state);
-
-        byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
-
+    private RetrievableStateHandle<T> doAddAndLock(String path, T state) throws Exception {
+        final RetrievableStateHandle<T> storeHandle = storage.store(state);
+        final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
         try {
             writeStoreHandleTransactionally(path, serializedStoreHandle);
             return storeHandle;
         } catch (Exception e) {
             if (indicatesPossiblyInconsistentState(e)) {
                 throw new PossibleInconsistentStateException(e);
             }
-
-            // in any other failure case: discard the state
+            // In case of any other failure, discard the state and rethrow the exception.
             storeHandle.discardState();
-
-            // We wrap the exception here so that it could be caught in DefaultJobGraphStore
-            throw ExceptionUtils.findThrowable(e, KeeperException.NodeExistsException.class)
-                    .map(
-                            nee ->
-                                    new AlreadyExistException(
-                                            "ZooKeeper node " + path + " already exists.", nee))
-                    .orElseThrow(() -> e);
+            throw e;
         }
     }
 
     // this method is provided for the sole purpose of easier testing
     @VisibleForTesting
     protected void writeStoreHandleTransactionally(String path, byte[] serializedStoreHandle)
             throws Exception {
-        // Write state handle (not the actual state) to ZooKeeper. This is expected to be
-        // smaller than the state itself. This level of indirection makes sure that data in
-        // ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but
-        // the state can be larger.
-        // Create the lock node in a transaction with the actual state node. That way we can
+        // Write state handle (not the actual state) to ZooKeeper. This is expected to be smaller
+        // than the state itself. This level of indirection makes sure that data in ZooKeeper is
+        // small, because ZooKeeper is designed for data in the KB range, but the state can be
+        // larger. Create the lock node in a transaction with the actual state node. That way we can
         // prevent race conditions with a concurrent delete operation.
-        client.inTransaction()
-                .create()
-                .withMode(CreateMode.PERSISTENT)
-                .forPath(path, serializedStoreHandle)
-                .and()
-                .create()
-                .withMode(CreateMode.EPHEMERAL)
-                .forPath(getLockPath(path))
-                .and()
-                .commit();
+        while (true) {
+            try {
+                client.inTransaction()
+                        .create()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path, serializedStoreHandle)
+                        .and()
+                        .create()
+                        .withMode(CreateMode.EPHEMERAL)
+                        .forPath(getLockPath(path))
+                        .and()
+                        .commit();
+                break;
+            } catch (KeeperException.NodeExistsException e) {

Review comment:
       We now check whether node is empty before executing `writeStoreHandleTransactionally()`, so this is hopefully covered by that. We can IMO trust the transaction mechanism here (assuming there are no concurrent modifications to the znode).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17607:
URL: https://github.com/apache/flink/pull/17607#issuecomment-954762085


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25646",
       "triggerID" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25790",
       "triggerID" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3aaabcc0d323dc042455d69a707bfccbb3d517b4",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25955",
       "triggerID" : "3aaabcc0d323dc042455d69a707bfccbb3d517b4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "eebbec021e44ecfa2b3ef49c6a0265abce126bbc",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "eebbec021e44ecfa2b3ef49c6a0265abce126bbc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3aaabcc0d323dc042455d69a707bfccbb3d517b4 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25955) 
   * eebbec021e44ecfa2b3ef49c6a0265abce126bbc UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17607:
URL: https://github.com/apache/flink/pull/17607#issuecomment-954762085


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25646",
       "triggerID" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25790",
       "triggerID" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3aaabcc0d323dc042455d69a707bfccbb3d517b4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25955",
       "triggerID" : "3aaabcc0d323dc042455d69a707bfccbb3d517b4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "eebbec021e44ecfa2b3ef49c6a0265abce126bbc",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26217",
       "triggerID" : "eebbec021e44ecfa2b3ef49c6a0265abce126bbc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * eebbec021e44ecfa2b3ef49c6a0265abce126bbc Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26217) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dmvk commented on a change in pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #17607:
URL: https://github.com/apache/flink/pull/17607#discussion_r755105408



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -87,18 +88,19 @@
 
     private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperStateHandleStore.class);
 
-    @VisibleForTesting
-    static final Set<Class<? extends KeeperException>> PRE_COMMIT_EXCEPTIONS =
-            newHashSet(
-                    KeeperException.NodeExistsException.class,
-                    KeeperException.BadArgumentsException.class,
-                    KeeperException.NoNodeException.class,
-                    KeeperException.NoAuthException.class,
-                    KeeperException.BadVersionException.class,
-                    KeeperException.AuthFailedException.class,
-                    KeeperException.InvalidACLException.class,
-                    KeeperException.SessionMovedException.class,
-                    KeeperException.NotReadOnlyException.class);
+    /** Pre-commit exceptions that don't imply data inconsistency. */
+    private static final Set<Class<? extends KeeperException>> SAFE_PRE_COMMIT_EXCEPTIONS =
+            new HashSet<>(
+                    Arrays.asList(
+                            KeeperException.NodeExistsException.class,

Review comment:
       Replace should never throw this exception (as it works only when node exists). After this change neither should `addAndLock`, so I'd say it should be safe to remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann closed pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
tillrohrmann closed pull request #17607:
URL: https://github.com/apache/flink/pull/17607


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #17607:
URL: https://github.com/apache/flink/pull/17607#discussion_r746360946



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,43 +156,40 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
-
-        RetrievableStateHandle<T> storeHandle = storage.store(state);
-
-        byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
-
+        if (exists(path).isExisting()) {
+            throw new AlreadyExistException(
+                    String.format("ZooKeeper node %s already exists.", path));
+        }
+        final RetrievableStateHandle<T> storeHandle = storage.store(state);
+        final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
         try {
             writeStoreHandleTransactionally(path, serializedStoreHandle);
             return storeHandle;
+        } catch (KeeperException.NodeExistsException e) {
+            // Transactions are not idempotent in the curator version we're currently using, so it
+            // is actually possible that we've re-tried a transaction that has already succeeded.
+            // We've ensured that the node hasn't been present prior executing the transaction, so
+            // we can assume that this is a result of the retry mechanism.
+            return storeHandle;

Review comment:
       Has `replace` the same problem?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -87,18 +88,19 @@
 
     private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperStateHandleStore.class);
 
-    @VisibleForTesting
-    static final Set<Class<? extends KeeperException>> PRE_COMMIT_EXCEPTIONS =
-            newHashSet(
-                    KeeperException.NodeExistsException.class,
-                    KeeperException.BadArgumentsException.class,
-                    KeeperException.NoNodeException.class,
-                    KeeperException.NoAuthException.class,
-                    KeeperException.BadVersionException.class,
-                    KeeperException.AuthFailedException.class,
-                    KeeperException.InvalidACLException.class,
-                    KeeperException.SessionMovedException.class,
-                    KeeperException.NotReadOnlyException.class);
+    /** Pre-commit exceptions that don't imply data inconsistency. */
+    private static final Set<Class<? extends KeeperException>> SAFE_PRE_COMMIT_EXCEPTIONS =
+            new HashSet<>(
+                    Arrays.asList(
+                            KeeperException.NodeExistsException.class,
+                            KeeperException.BadArgumentsException.class,
+                            KeeperException.NoNodeException.class,
+                            KeeperException.NoAuthException.class,
+                            KeeperException.BadVersionException.class,
+                            KeeperException.AuthFailedException.class,
+                            KeeperException.InvalidACLException.class,
+                            KeeperException.SessionMovedException.class,
+                            KeeperException.NotReadOnlyException.class));

Review comment:
       Should we double check that any of these exceptions is safe in the sense that if we see it, then no metadata has been written (for `NodeExistsException` this does not hold true)? Maybe it is possible that we write something, then there is a connection loss and then a session moved. Can this happen? If yes, then we shouldn't consider `SessionMovedException`, for example, as safe.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -87,18 +88,19 @@
 
     private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperStateHandleStore.class);
 
-    @VisibleForTesting
-    static final Set<Class<? extends KeeperException>> PRE_COMMIT_EXCEPTIONS =
-            newHashSet(
-                    KeeperException.NodeExistsException.class,
-                    KeeperException.BadArgumentsException.class,
-                    KeeperException.NoNodeException.class,
-                    KeeperException.NoAuthException.class,
-                    KeeperException.BadVersionException.class,
-                    KeeperException.AuthFailedException.class,
-                    KeeperException.InvalidACLException.class,
-                    KeeperException.SessionMovedException.class,
-                    KeeperException.NotReadOnlyException.class);
+    /** Pre-commit exceptions that don't imply data inconsistency. */
+    private static final Set<Class<? extends KeeperException>> SAFE_PRE_COMMIT_EXCEPTIONS =
+            new HashSet<>(
+                    Arrays.asList(
+                            KeeperException.NodeExistsException.class,

Review comment:
       If `replace` has the same problem, does it make sense to remove `NodeExistsException` from this list?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #17607:
URL: https://github.com/apache/flink/pull/17607#discussion_r746362967



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -87,18 +88,19 @@
 
     private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperStateHandleStore.class);
 
-    @VisibleForTesting
-    static final Set<Class<? extends KeeperException>> PRE_COMMIT_EXCEPTIONS =
-            newHashSet(
-                    KeeperException.NodeExistsException.class,
-                    KeeperException.BadArgumentsException.class,
-                    KeeperException.NoNodeException.class,
-                    KeeperException.NoAuthException.class,
-                    KeeperException.BadVersionException.class,
-                    KeeperException.AuthFailedException.class,
-                    KeeperException.InvalidACLException.class,
-                    KeeperException.SessionMovedException.class,
-                    KeeperException.NotReadOnlyException.class);
+    /** Pre-commit exceptions that don't imply data inconsistency. */
+    private static final Set<Class<? extends KeeperException>> SAFE_PRE_COMMIT_EXCEPTIONS =
+            new HashSet<>(
+                    Arrays.asList(
+                            KeeperException.NodeExistsException.class,
+                            KeeperException.BadArgumentsException.class,
+                            KeeperException.NoNodeException.class,
+                            KeeperException.NoAuthException.class,
+                            KeeperException.BadVersionException.class,
+                            KeeperException.AuthFailedException.class,
+                            KeeperException.InvalidACLException.class,
+                            KeeperException.SessionMovedException.class,
+                            KeeperException.NotReadOnlyException.class));

Review comment:
       Should we double check that any of these exceptions is safe in the sense that if we see it, then no metadata has been written (for `NodeExistsException` this does not hold true)? Maybe it is possible that we write something, then there is a connection loss and then a session moved. Can this happen? If yes, then we shouldn't consider `SessionMovedException`, for example, as safe. Maybe we can take a look at the fix for https://issues.apache.org/jira/browse/CURATOR-584 to see what they did.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17607:
URL: https://github.com/apache/flink/pull/17607#issuecomment-954762085






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #17607:
URL: https://github.com/apache/flink/pull/17607#discussion_r743521497



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));
+            } catch (KeeperException.NodeExistsException e) {
+                // There should always be a single JobMaster for each job, so we should never
+                // encounter this.
+                throw new IllegalStateException(
+                        "Write lock is already taken, which signals there are multiple processes writing to the same path. This should never happen.",
+                        e);
+            }
+            if (exists(path).isExisting()) {
+                throw new AlreadyExistException(
+                        String.format("ZooKeeper node %s already exists.", path));
+            }
+            return doAddAndLock(path, state);
+        } finally {
+            // Release the write-lock.
+            deleteIfExists(getWriteLockPath(path));
+        }
+    }
 
-        RetrievableStateHandle<T> storeHandle = storage.store(state);
-
-        byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
-
+    private RetrievableStateHandle<T> doAddAndLock(String path, T state) throws Exception {
+        final RetrievableStateHandle<T> storeHandle = storage.store(state);
+        final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
         try {
             writeStoreHandleTransactionally(path, serializedStoreHandle);
             return storeHandle;
         } catch (Exception e) {
             if (indicatesPossiblyInconsistentState(e)) {
                 throw new PossibleInconsistentStateException(e);
             }
-
-            // in any other failure case: discard the state
+            // In case of any other failure, discard the state and rethrow the exception.
             storeHandle.discardState();
-
-            // We wrap the exception here so that it could be caught in DefaultJobGraphStore
-            throw ExceptionUtils.findThrowable(e, KeeperException.NodeExistsException.class)
-                    .map(
-                            nee ->
-                                    new AlreadyExistException(
-                                            "ZooKeeper node " + path + " already exists.", nee))
-                    .orElseThrow(() -> e);
+            throw e;
         }
     }
 
     // this method is provided for the sole purpose of easier testing
     @VisibleForTesting
     protected void writeStoreHandleTransactionally(String path, byte[] serializedStoreHandle)
             throws Exception {
-        // Write state handle (not the actual state) to ZooKeeper. This is expected to be
-        // smaller than the state itself. This level of indirection makes sure that data in
-        // ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but
-        // the state can be larger.
-        // Create the lock node in a transaction with the actual state node. That way we can
+        // Write state handle (not the actual state) to ZooKeeper. This is expected to be smaller
+        // than the state itself. This level of indirection makes sure that data in ZooKeeper is
+        // small, because ZooKeeper is designed for data in the KB range, but the state can be
+        // larger. Create the lock node in a transaction with the actual state node. That way we can
         // prevent race conditions with a concurrent delete operation.
-        client.inTransaction()
-                .create()
-                .withMode(CreateMode.PERSISTENT)
-                .forPath(path, serializedStoreHandle)
-                .and()
-                .create()
-                .withMode(CreateMode.EPHEMERAL)
-                .forPath(getLockPath(path))
-                .and()
-                .commit();
+        while (true) {
+            try {
+                client.inTransaction()
+                        .create()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path, serializedStoreHandle)
+                        .and()
+                        .create()
+                        .withMode(CreateMode.EPHEMERAL)
+                        .forPath(getLockPath(path))
+                        .and()
+                        .commit();
+                break;
+            } catch (KeeperException.NodeExistsException e) {
+                // Transactions are not idempotent in the curator version we're currently using, so
+                // it is actually possible that we've re-tried a transaction that has already
+                // succeeded. We've ensured that the node hasn't been present prior executing the
+                // transaction, so we're pretty confident that this is a result of the retry
+                // mechanism.

Review comment:
       If we catch a `NodeExistsException`, then we will retry the loop. Won't this cause another `NodeExistsException`?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));
+            } catch (KeeperException.NodeExistsException e) {
+                // There should always be a single JobMaster for each job, so we should never
+                // encounter this.
+                throw new IllegalStateException(
+                        "Write lock is already taken, which signals there are multiple processes writing to the same path. This should never happen.",
+                        e);
+            }
+            if (exists(path).isExisting()) {
+                throw new AlreadyExistException(
+                        String.format("ZooKeeper node %s already exists.", path));
+            }
+            return doAddAndLock(path, state);
+        } finally {
+            // Release the write-lock.
+            deleteIfExists(getWriteLockPath(path));
+        }
+    }
 
-        RetrievableStateHandle<T> storeHandle = storage.store(state);
-
-        byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
-
+    private RetrievableStateHandle<T> doAddAndLock(String path, T state) throws Exception {
+        final RetrievableStateHandle<T> storeHandle = storage.store(state);
+        final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
         try {
             writeStoreHandleTransactionally(path, serializedStoreHandle);
             return storeHandle;
         } catch (Exception e) {
             if (indicatesPossiblyInconsistentState(e)) {
                 throw new PossibleInconsistentStateException(e);
             }
-
-            // in any other failure case: discard the state
+            // In case of any other failure, discard the state and rethrow the exception.
             storeHandle.discardState();
-
-            // We wrap the exception here so that it could be caught in DefaultJobGraphStore
-            throw ExceptionUtils.findThrowable(e, KeeperException.NodeExistsException.class)
-                    .map(
-                            nee ->
-                                    new AlreadyExistException(
-                                            "ZooKeeper node " + path + " already exists.", nee))
-                    .orElseThrow(() -> e);
+            throw e;
         }
     }
 
     // this method is provided for the sole purpose of easier testing
     @VisibleForTesting
     protected void writeStoreHandleTransactionally(String path, byte[] serializedStoreHandle)
             throws Exception {
-        // Write state handle (not the actual state) to ZooKeeper. This is expected to be
-        // smaller than the state itself. This level of indirection makes sure that data in
-        // ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but
-        // the state can be larger.
-        // Create the lock node in a transaction with the actual state node. That way we can
+        // Write state handle (not the actual state) to ZooKeeper. This is expected to be smaller
+        // than the state itself. This level of indirection makes sure that data in ZooKeeper is
+        // small, because ZooKeeper is designed for data in the KB range, but the state can be
+        // larger. Create the lock node in a transaction with the actual state node. That way we can
         // prevent race conditions with a concurrent delete operation.
-        client.inTransaction()
-                .create()
-                .withMode(CreateMode.PERSISTENT)
-                .forPath(path, serializedStoreHandle)
-                .and()
-                .create()
-                .withMode(CreateMode.EPHEMERAL)
-                .forPath(getLockPath(path))
-                .and()
-                .commit();
+        while (true) {
+            try {
+                client.inTransaction()
+                        .create()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path, serializedStoreHandle)
+                        .and()
+                        .create()
+                        .withMode(CreateMode.EPHEMERAL)
+                        .forPath(getLockPath(path))
+                        .and()
+                        .commit();
+                break;
+            } catch (KeeperException.NodeExistsException e) {

Review comment:
       We could also read the contents of `path` to see whether we have written the `serializedStoreHandle`. If this is the case, then the call succeeded and otherwise we can throw the `NodeExistsException`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));

Review comment:
       Won't this call have the same problem we try to solve with this PR? Differently asked, won't we have a problem if this call retries and the first call already succeeded?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));
+            } catch (KeeperException.NodeExistsException e) {
+                // There should always be a single JobMaster for each job, so we should never
+                // encounter this.
+                throw new IllegalStateException(
+                        "Write lock is already taken, which signals there are multiple processes writing to the same path. This should never happen.",
+                        e);
+            }
+            if (exists(path).isExisting()) {
+                throw new AlreadyExistException(
+                        String.format("ZooKeeper node %s already exists.", path));
+            }
+            return doAddAndLock(path, state);
+        } finally {
+            // Release the write-lock.
+            deleteIfExists(getWriteLockPath(path));
+        }
+    }
 
-        RetrievableStateHandle<T> storeHandle = storage.store(state);
-
-        byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
-
+    private RetrievableStateHandle<T> doAddAndLock(String path, T state) throws Exception {
+        final RetrievableStateHandle<T> storeHandle = storage.store(state);
+        final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
         try {
             writeStoreHandleTransactionally(path, serializedStoreHandle);
             return storeHandle;
         } catch (Exception e) {
             if (indicatesPossiblyInconsistentState(e)) {
                 throw new PossibleInconsistentStateException(e);
             }
-
-            // in any other failure case: discard the state
+            // In case of any other failure, discard the state and rethrow the exception.
             storeHandle.discardState();
-
-            // We wrap the exception here so that it could be caught in DefaultJobGraphStore
-            throw ExceptionUtils.findThrowable(e, KeeperException.NodeExistsException.class)
-                    .map(
-                            nee ->
-                                    new AlreadyExistException(
-                                            "ZooKeeper node " + path + " already exists.", nee))
-                    .orElseThrow(() -> e);
+            throw e;
         }
     }
 
     // this method is provided for the sole purpose of easier testing
     @VisibleForTesting
     protected void writeStoreHandleTransactionally(String path, byte[] serializedStoreHandle)
             throws Exception {
-        // Write state handle (not the actual state) to ZooKeeper. This is expected to be
-        // smaller than the state itself. This level of indirection makes sure that data in
-        // ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but
-        // the state can be larger.
-        // Create the lock node in a transaction with the actual state node. That way we can
+        // Write state handle (not the actual state) to ZooKeeper. This is expected to be smaller
+        // than the state itself. This level of indirection makes sure that data in ZooKeeper is
+        // small, because ZooKeeper is designed for data in the KB range, but the state can be
+        // larger. Create the lock node in a transaction with the actual state node. That way we can
         // prevent race conditions with a concurrent delete operation.
-        client.inTransaction()
-                .create()
-                .withMode(CreateMode.PERSISTENT)
-                .forPath(path, serializedStoreHandle)
-                .and()
-                .create()
-                .withMode(CreateMode.EPHEMERAL)
-                .forPath(getLockPath(path))
-                .and()
-                .commit();
+        while (true) {
+            try {
+                client.inTransaction()
+                        .create()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path, serializedStoreHandle)
+                        .and()
+                        .create()
+                        .withMode(CreateMode.EPHEMERAL)
+                        .forPath(getLockPath(path))
+                        .and()
+                        .commit();
+                break;
+            } catch (KeeperException.NodeExistsException e) {
+                // Transactions are not idempotent in the curator version we're currently using, so
+                // it is actually possible that we've re-tried a transaction that has already
+                // succeeded. We've ensured that the node hasn't been present prior executing the
+                // transaction, so we're pretty confident that this is a result of the retry
+                // mechanism.

Review comment:
       Maybe adding a test for this method could be helpful.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #17607:
URL: https://github.com/apache/flink/pull/17607#discussion_r743521497



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));
+            } catch (KeeperException.NodeExistsException e) {
+                // There should always be a single JobMaster for each job, so we should never
+                // encounter this.
+                throw new IllegalStateException(
+                        "Write lock is already taken, which signals there are multiple processes writing to the same path. This should never happen.",
+                        e);
+            }
+            if (exists(path).isExisting()) {
+                throw new AlreadyExistException(
+                        String.format("ZooKeeper node %s already exists.", path));
+            }
+            return doAddAndLock(path, state);
+        } finally {
+            // Release the write-lock.
+            deleteIfExists(getWriteLockPath(path));
+        }
+    }
 
-        RetrievableStateHandle<T> storeHandle = storage.store(state);
-
-        byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
-
+    private RetrievableStateHandle<T> doAddAndLock(String path, T state) throws Exception {
+        final RetrievableStateHandle<T> storeHandle = storage.store(state);
+        final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
         try {
             writeStoreHandleTransactionally(path, serializedStoreHandle);
             return storeHandle;
         } catch (Exception e) {
             if (indicatesPossiblyInconsistentState(e)) {
                 throw new PossibleInconsistentStateException(e);
             }
-
-            // in any other failure case: discard the state
+            // In case of any other failure, discard the state and rethrow the exception.
             storeHandle.discardState();
-
-            // We wrap the exception here so that it could be caught in DefaultJobGraphStore
-            throw ExceptionUtils.findThrowable(e, KeeperException.NodeExistsException.class)
-                    .map(
-                            nee ->
-                                    new AlreadyExistException(
-                                            "ZooKeeper node " + path + " already exists.", nee))
-                    .orElseThrow(() -> e);
+            throw e;
         }
     }
 
     // this method is provided for the sole purpose of easier testing
     @VisibleForTesting
     protected void writeStoreHandleTransactionally(String path, byte[] serializedStoreHandle)
             throws Exception {
-        // Write state handle (not the actual state) to ZooKeeper. This is expected to be
-        // smaller than the state itself. This level of indirection makes sure that data in
-        // ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but
-        // the state can be larger.
-        // Create the lock node in a transaction with the actual state node. That way we can
+        // Write state handle (not the actual state) to ZooKeeper. This is expected to be smaller
+        // than the state itself. This level of indirection makes sure that data in ZooKeeper is
+        // small, because ZooKeeper is designed for data in the KB range, but the state can be
+        // larger. Create the lock node in a transaction with the actual state node. That way we can
         // prevent race conditions with a concurrent delete operation.
-        client.inTransaction()
-                .create()
-                .withMode(CreateMode.PERSISTENT)
-                .forPath(path, serializedStoreHandle)
-                .and()
-                .create()
-                .withMode(CreateMode.EPHEMERAL)
-                .forPath(getLockPath(path))
-                .and()
-                .commit();
+        while (true) {
+            try {
+                client.inTransaction()
+                        .create()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path, serializedStoreHandle)
+                        .and()
+                        .create()
+                        .withMode(CreateMode.EPHEMERAL)
+                        .forPath(getLockPath(path))
+                        .and()
+                        .commit();
+                break;
+            } catch (KeeperException.NodeExistsException e) {
+                // Transactions are not idempotent in the curator version we're currently using, so
+                // it is actually possible that we've re-tried a transaction that has already
+                // succeeded. We've ensured that the node hasn't been present prior executing the
+                // transaction, so we're pretty confident that this is a result of the retry
+                // mechanism.

Review comment:
       If we catch a `NodeExistsException`, then we will retry the loop. Won't this cause another `NodeExistsException`?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));
+            } catch (KeeperException.NodeExistsException e) {
+                // There should always be a single JobMaster for each job, so we should never
+                // encounter this.
+                throw new IllegalStateException(
+                        "Write lock is already taken, which signals there are multiple processes writing to the same path. This should never happen.",
+                        e);
+            }
+            if (exists(path).isExisting()) {
+                throw new AlreadyExistException(
+                        String.format("ZooKeeper node %s already exists.", path));
+            }
+            return doAddAndLock(path, state);
+        } finally {
+            // Release the write-lock.
+            deleteIfExists(getWriteLockPath(path));
+        }
+    }
 
-        RetrievableStateHandle<T> storeHandle = storage.store(state);
-
-        byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
-
+    private RetrievableStateHandle<T> doAddAndLock(String path, T state) throws Exception {
+        final RetrievableStateHandle<T> storeHandle = storage.store(state);
+        final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
         try {
             writeStoreHandleTransactionally(path, serializedStoreHandle);
             return storeHandle;
         } catch (Exception e) {
             if (indicatesPossiblyInconsistentState(e)) {
                 throw new PossibleInconsistentStateException(e);
             }
-
-            // in any other failure case: discard the state
+            // In case of any other failure, discard the state and rethrow the exception.
             storeHandle.discardState();
-
-            // We wrap the exception here so that it could be caught in DefaultJobGraphStore
-            throw ExceptionUtils.findThrowable(e, KeeperException.NodeExistsException.class)
-                    .map(
-                            nee ->
-                                    new AlreadyExistException(
-                                            "ZooKeeper node " + path + " already exists.", nee))
-                    .orElseThrow(() -> e);
+            throw e;
         }
     }
 
     // this method is provided for the sole purpose of easier testing
     @VisibleForTesting
     protected void writeStoreHandleTransactionally(String path, byte[] serializedStoreHandle)
             throws Exception {
-        // Write state handle (not the actual state) to ZooKeeper. This is expected to be
-        // smaller than the state itself. This level of indirection makes sure that data in
-        // ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but
-        // the state can be larger.
-        // Create the lock node in a transaction with the actual state node. That way we can
+        // Write state handle (not the actual state) to ZooKeeper. This is expected to be smaller
+        // than the state itself. This level of indirection makes sure that data in ZooKeeper is
+        // small, because ZooKeeper is designed for data in the KB range, but the state can be
+        // larger. Create the lock node in a transaction with the actual state node. That way we can
         // prevent race conditions with a concurrent delete operation.
-        client.inTransaction()
-                .create()
-                .withMode(CreateMode.PERSISTENT)
-                .forPath(path, serializedStoreHandle)
-                .and()
-                .create()
-                .withMode(CreateMode.EPHEMERAL)
-                .forPath(getLockPath(path))
-                .and()
-                .commit();
+        while (true) {
+            try {
+                client.inTransaction()
+                        .create()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path, serializedStoreHandle)
+                        .and()
+                        .create()
+                        .withMode(CreateMode.EPHEMERAL)
+                        .forPath(getLockPath(path))
+                        .and()
+                        .commit();
+                break;
+            } catch (KeeperException.NodeExistsException e) {

Review comment:
       We could also read the contents of `path` to see whether we have written the `serializedStoreHandle`. If this is the case, then the call succeeded and otherwise we can throw the `NodeExistsException`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));

Review comment:
       Won't this call have the same problem we try to solve with this PR? Differently asked, won't we have a problem if this call retries and the first call already succeeded?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));
+            } catch (KeeperException.NodeExistsException e) {
+                // There should always be a single JobMaster for each job, so we should never
+                // encounter this.
+                throw new IllegalStateException(
+                        "Write lock is already taken, which signals there are multiple processes writing to the same path. This should never happen.",
+                        e);
+            }
+            if (exists(path).isExisting()) {
+                throw new AlreadyExistException(
+                        String.format("ZooKeeper node %s already exists.", path));
+            }
+            return doAddAndLock(path, state);
+        } finally {
+            // Release the write-lock.
+            deleteIfExists(getWriteLockPath(path));
+        }
+    }
 
-        RetrievableStateHandle<T> storeHandle = storage.store(state);
-
-        byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
-
+    private RetrievableStateHandle<T> doAddAndLock(String path, T state) throws Exception {
+        final RetrievableStateHandle<T> storeHandle = storage.store(state);
+        final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
         try {
             writeStoreHandleTransactionally(path, serializedStoreHandle);
             return storeHandle;
         } catch (Exception e) {
             if (indicatesPossiblyInconsistentState(e)) {
                 throw new PossibleInconsistentStateException(e);
             }
-
-            // in any other failure case: discard the state
+            // In case of any other failure, discard the state and rethrow the exception.
             storeHandle.discardState();
-
-            // We wrap the exception here so that it could be caught in DefaultJobGraphStore
-            throw ExceptionUtils.findThrowable(e, KeeperException.NodeExistsException.class)
-                    .map(
-                            nee ->
-                                    new AlreadyExistException(
-                                            "ZooKeeper node " + path + " already exists.", nee))
-                    .orElseThrow(() -> e);
+            throw e;
         }
     }
 
     // this method is provided for the sole purpose of easier testing
     @VisibleForTesting
     protected void writeStoreHandleTransactionally(String path, byte[] serializedStoreHandle)
             throws Exception {
-        // Write state handle (not the actual state) to ZooKeeper. This is expected to be
-        // smaller than the state itself. This level of indirection makes sure that data in
-        // ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but
-        // the state can be larger.
-        // Create the lock node in a transaction with the actual state node. That way we can
+        // Write state handle (not the actual state) to ZooKeeper. This is expected to be smaller
+        // than the state itself. This level of indirection makes sure that data in ZooKeeper is
+        // small, because ZooKeeper is designed for data in the KB range, but the state can be
+        // larger. Create the lock node in a transaction with the actual state node. That way we can
         // prevent race conditions with a concurrent delete operation.
-        client.inTransaction()
-                .create()
-                .withMode(CreateMode.PERSISTENT)
-                .forPath(path, serializedStoreHandle)
-                .and()
-                .create()
-                .withMode(CreateMode.EPHEMERAL)
-                .forPath(getLockPath(path))
-                .and()
-                .commit();
+        while (true) {
+            try {
+                client.inTransaction()
+                        .create()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path, serializedStoreHandle)
+                        .and()
+                        .create()
+                        .withMode(CreateMode.EPHEMERAL)
+                        .forPath(getLockPath(path))
+                        .and()
+                        .commit();
+                break;
+            } catch (KeeperException.NodeExistsException e) {
+                // Transactions are not idempotent in the curator version we're currently using, so
+                // it is actually possible that we've re-tried a transaction that has already
+                // succeeded. We've ensured that the node hasn't been present prior executing the
+                // transaction, so we're pretty confident that this is a result of the retry
+                // mechanism.

Review comment:
       Maybe adding a test for this method could be helpful.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] XComp commented on a change in pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
XComp commented on a change in pull request #17607:
URL: https://github.com/apache/flink/pull/17607#discussion_r742141231



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));
+            } catch (KeeperException.NodeExistsException e) {
+                // There should always be a single JobMaster for each job, so we should never
+                // encounter this.
+                throw new IllegalStateException(
+                        "Write lock is already taken, which signals there are multiple processing writing to the same path. This should never happen.",

Review comment:
       ```suggestion
                           "Write lock is already taken, which signals there are multiple processes writing to the same path. This should never happen.",
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -87,18 +88,19 @@
 
     private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperStateHandleStore.class);
 
-    @VisibleForTesting
-    static final Set<Class<? extends KeeperException>> PRE_COMMIT_EXCEPTIONS =
-            newHashSet(
-                    KeeperException.NodeExistsException.class,
-                    KeeperException.BadArgumentsException.class,
-                    KeeperException.NoNodeException.class,
-                    KeeperException.NoAuthException.class,
-                    KeeperException.BadVersionException.class,
-                    KeeperException.AuthFailedException.class,
-                    KeeperException.InvalidACLException.class,
-                    KeeperException.SessionMovedException.class,
-                    KeeperException.NotReadOnlyException.class);
+    /** Pre-commit exceptions that don't imply data inconsistency. */

Review comment:
       this comment is helpful 👍 

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));
+            } catch (KeeperException.NodeExistsException e) {
+                // There should always be a single JobMaster for each job, so we should never
+                // encounter this.
+                throw new IllegalStateException(
+                        "Write lock is already taken, which signals there are multiple processing writing to the same path. This should never happen.",
+                        e);
+            }
+            if (exists(path).isExisting()) {
+                throw new AlreadyExistException(
+                        String.format("ZooKeeper node %s already exists.", path));
+            }
+            return doAddAndLock(path, state);
+        } finally {
+            // Release the write-lock.
+            deleteIfExists(getWriteLockPath(path));
+        }
+    }
 
-        RetrievableStateHandle<T> storeHandle = storage.store(state);
-
-        byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
-
+    private RetrievableStateHandle<T> doAddAndLock(String path, T state) throws Exception {
+        final RetrievableStateHandle<T> storeHandle = storage.store(state);
+        final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
         try {
             writeStoreHandleTransactionally(path, serializedStoreHandle);
             return storeHandle;
         } catch (Exception e) {
             if (indicatesPossiblyInconsistentState(e)) {
                 throw new PossibleInconsistentStateException(e);
             }
-
-            // in any other failure case: discard the state
+            // In case of any other failure, discard the state and rethrow the exception.
             storeHandle.discardState();
-
-            // We wrap the exception here so that it could be caught in DefaultJobGraphStore
-            throw ExceptionUtils.findThrowable(e, KeeperException.NodeExistsException.class)
-                    .map(
-                            nee ->
-                                    new AlreadyExistException(
-                                            "ZooKeeper node " + path + " already exists.", nee))
-                    .orElseThrow(() -> e);
+            throw e;
         }
     }
 
     // this method is provided for the sole purpose of easier testing
     @VisibleForTesting
     protected void writeStoreHandleTransactionally(String path, byte[] serializedStoreHandle)
             throws Exception {
-        // Write state handle (not the actual state) to ZooKeeper. This is expected to be
-        // smaller than the state itself. This level of indirection makes sure that data in
-        // ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but
-        // the state can be larger.
-        // Create the lock node in a transaction with the actual state node. That way we can
+        // Write state handle (not the actual state) to ZooKeeper. This is expected to be smaller
+        // than the state itself. This level of indirection makes sure that data in ZooKeeper is
+        // small, because ZooKeeper is designed for data in the KB range, but the state can be
+        // larger. Create the lock node in a transaction with the actual state node. That way we can
         // prevent race conditions with a concurrent delete operation.
-        client.inTransaction()
-                .create()
-                .withMode(CreateMode.PERSISTENT)
-                .forPath(path, serializedStoreHandle)
-                .and()
-                .create()
-                .withMode(CreateMode.EPHEMERAL)
-                .forPath(getLockPath(path))
-                .and()
-                .commit();
+        while (true) {
+            try {
+                client.inTransaction()
+                        .create()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path, serializedStoreHandle)
+                        .and()
+                        .create()
+                        .withMode(CreateMode.EPHEMERAL)
+                        .forPath(getLockPath(path))
+                        .and()
+                        .commit();
+                break;
+            } catch (KeeperException.NodeExistsException e) {
+                // Transactions are not idempotent in the curator version we're currently used, so

Review comment:
       ```suggestion
                   // Transactions are not idempotent in the curator version we're currently using, so
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dmvk commented on a change in pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #17607:
URL: https://github.com/apache/flink/pull/17607#discussion_r742796681



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));
+            } catch (KeeperException.NodeExistsException e) {
+                // There should always be a single JobMaster for each job, so we should never
+                // encounter this.
+                throw new IllegalStateException(
+                        "Write lock is already taken, which signals there are multiple processing writing to the same path. This should never happen.",
+                        e);
+            }
+            if (exists(path).isExisting()) {
+                throw new AlreadyExistException(
+                        String.format("ZooKeeper node %s already exists.", path));
+            }
+            return doAddAndLock(path, state);
+        } finally {
+            // Release the write-lock.
+            deleteIfExists(getWriteLockPath(path));
+        }
+    }
 
-        RetrievableStateHandle<T> storeHandle = storage.store(state);
-
-        byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
-
+    private RetrievableStateHandle<T> doAddAndLock(String path, T state) throws Exception {
+        final RetrievableStateHandle<T> storeHandle = storage.store(state);
+        final byte[] serializedStoreHandle = serializeOrDiscard(storeHandle);
         try {
             writeStoreHandleTransactionally(path, serializedStoreHandle);
             return storeHandle;
         } catch (Exception e) {
             if (indicatesPossiblyInconsistentState(e)) {
                 throw new PossibleInconsistentStateException(e);
             }
-
-            // in any other failure case: discard the state
+            // In case of any other failure, discard the state and rethrow the exception.
             storeHandle.discardState();
-
-            // We wrap the exception here so that it could be caught in DefaultJobGraphStore
-            throw ExceptionUtils.findThrowable(e, KeeperException.NodeExistsException.class)
-                    .map(
-                            nee ->
-                                    new AlreadyExistException(
-                                            "ZooKeeper node " + path + " already exists.", nee))
-                    .orElseThrow(() -> e);
+            throw e;
         }
     }
 
     // this method is provided for the sole purpose of easier testing
     @VisibleForTesting
     protected void writeStoreHandleTransactionally(String path, byte[] serializedStoreHandle)
             throws Exception {
-        // Write state handle (not the actual state) to ZooKeeper. This is expected to be
-        // smaller than the state itself. This level of indirection makes sure that data in
-        // ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but
-        // the state can be larger.
-        // Create the lock node in a transaction with the actual state node. That way we can
+        // Write state handle (not the actual state) to ZooKeeper. This is expected to be smaller
+        // than the state itself. This level of indirection makes sure that data in ZooKeeper is
+        // small, because ZooKeeper is designed for data in the KB range, but the state can be
+        // larger. Create the lock node in a transaction with the actual state node. That way we can
         // prevent race conditions with a concurrent delete operation.
-        client.inTransaction()
-                .create()
-                .withMode(CreateMode.PERSISTENT)
-                .forPath(path, serializedStoreHandle)
-                .and()
-                .create()
-                .withMode(CreateMode.EPHEMERAL)
-                .forPath(getLockPath(path))
-                .and()
-                .commit();
+        while (true) {
+            try {
+                client.inTransaction()
+                        .create()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path, serializedStoreHandle)
+                        .and()
+                        .create()
+                        .withMode(CreateMode.EPHEMERAL)
+                        .forPath(getLockPath(path))
+                        .and()
+                        .commit();
+                break;
+            } catch (KeeperException.NodeExistsException e) {
+                // Transactions are not idempotent in the curator version we're currently used, so

Review comment:
       👍 

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -154,54 +158,75 @@ public ZooKeeperStateHandleStore(
             throws PossibleInconsistentStateException, Exception {
         checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
         checkNotNull(state, "State");
-
         final String path = normalizePath(pathInZooKeeper);
+        try {
+            try {
+                // Obtain the write-lock.
+                client.create().withMode(CreateMode.EPHEMERAL).forPath(getWriteLockPath(path));
+            } catch (KeeperException.NodeExistsException e) {
+                // There should always be a single JobMaster for each job, so we should never
+                // encounter this.
+                throw new IllegalStateException(
+                        "Write lock is already taken, which signals there are multiple processing writing to the same path. This should never happen.",

Review comment:
       👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17607:
URL: https://github.com/apache/flink/pull/17607#issuecomment-954762085


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25646",
       "triggerID" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7ccc4b0240e733447c42bbfa48911f768afac57d Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25646) 
   * 497cb1c9352a1097ca8ee32d2bf68afba094e263 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #17607:
URL: https://github.com/apache/flink/pull/17607#issuecomment-954764965


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 7ccc4b0240e733447c42bbfa48911f768afac57d (Fri Oct 29 13:55:45 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17607:
URL: https://github.com/apache/flink/pull/17607#issuecomment-954762085


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25646",
       "triggerID" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25790",
       "triggerID" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 497cb1c9352a1097ca8ee32d2bf68afba094e263 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25790) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17607:
URL: https://github.com/apache/flink/pull/17607#issuecomment-954762085


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25646",
       "triggerID" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25790",
       "triggerID" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3aaabcc0d323dc042455d69a707bfccbb3d517b4",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25955",
       "triggerID" : "3aaabcc0d323dc042455d69a707bfccbb3d517b4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3f895fa6df62764dfbd6e244865be4b3c14fb1dc",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3f895fa6df62764dfbd6e244865be4b3c14fb1dc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3aaabcc0d323dc042455d69a707bfccbb3d517b4 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25955) 
   * 3f895fa6df62764dfbd6e244865be4b3c14fb1dc UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dmvk commented on a change in pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #17607:
URL: https://github.com/apache/flink/pull/17607#discussion_r755120267



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
##########
@@ -87,18 +88,19 @@
 
     private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperStateHandleStore.class);
 
-    @VisibleForTesting
-    static final Set<Class<? extends KeeperException>> PRE_COMMIT_EXCEPTIONS =
-            newHashSet(
-                    KeeperException.NodeExistsException.class,
-                    KeeperException.BadArgumentsException.class,
-                    KeeperException.NoNodeException.class,
-                    KeeperException.NoAuthException.class,
-                    KeeperException.BadVersionException.class,
-                    KeeperException.AuthFailedException.class,
-                    KeeperException.InvalidACLException.class,
-                    KeeperException.SessionMovedException.class,
-                    KeeperException.NotReadOnlyException.class);
+    /** Pre-commit exceptions that don't imply data inconsistency. */
+    private static final Set<Class<? extends KeeperException>> SAFE_PRE_COMMIT_EXCEPTIONS =
+            new HashSet<>(
+                    Arrays.asList(
+                            KeeperException.NodeExistsException.class,
+                            KeeperException.BadArgumentsException.class,
+                            KeeperException.NoNodeException.class,
+                            KeeperException.NoAuthException.class,
+                            KeeperException.BadVersionException.class,
+                            KeeperException.AuthFailedException.class,
+                            KeeperException.InvalidACLException.class,
+                            KeeperException.SessionMovedException.class,
+                            KeeperException.NotReadOnlyException.class));

Review comment:
       One more suggestion, let's merge it as is and we should follow-up in different PR. It's still a big improvement from the original state and we'd need the patch for the transaction code path anyway.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17607: [FLINK-24543][runtime] Avoid possible inconsistencies of ZookeeperSta…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17607:
URL: https://github.com/apache/flink/pull/17607#issuecomment-954762085


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25646",
       "triggerID" : "7ccc4b0240e733447c42bbfa48911f768afac57d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25790",
       "triggerID" : "497cb1c9352a1097ca8ee32d2bf68afba094e263",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3aaabcc0d323dc042455d69a707bfccbb3d517b4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25955",
       "triggerID" : "3aaabcc0d323dc042455d69a707bfccbb3d517b4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "eebbec021e44ecfa2b3ef49c6a0265abce126bbc",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26217",
       "triggerID" : "eebbec021e44ecfa2b3ef49c6a0265abce126bbc",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b8fc097e73709a1f449b66ee31d744caa2ebb8e7",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27123",
       "triggerID" : "b8fc097e73709a1f449b66ee31d744caa2ebb8e7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d7ad08ea24cb632d0d3db6dc942bb052ab9b464",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3d7ad08ea24cb632d0d3db6dc942bb052ab9b464",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b8fc097e73709a1f449b66ee31d744caa2ebb8e7 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27123) 
   * 3d7ad08ea24cb632d0d3db6dc942bb052ab9b464 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org