You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/11 06:23:13 UTC

[GitHub] [flink] Myasuka commented on a change in pull request #19050: [FLINK-26592][state/changelog] Schedule uploads outside synchronized block

Myasuka commented on a change in pull request #19050:
URL: https://github.com/apache/flink/pull/19050#discussion_r824419394



##########
File path: flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
##########
@@ -204,22 +205,25 @@ public SequenceNumber nextSequenceNumber() {
             SequenceNumberRange range = SequenceNumberRange.generic(from, activeSequenceNumber);
             if (range.size() == readyToReturn.size()) {
                 checkState(toUpload.isEmpty());
-                return completedFuture(buildHandle(keyGroupRange, readyToReturn, 0L));
+                future.complete(buildHandle(keyGroupRange, readyToReturn, 0L));
             } else {
-                CompletableFuture<ChangelogStateHandleStreamImpl> future =
-                        new CompletableFuture<>();
                 uploadCompletionListeners.add(
                         new UploadCompletionListener(keyGroupRange, range, readyToReturn, future));
                 if (!toUpload.isEmpty()) {
-                    uploader.upload(
+                    uploadTask =
                             new UploadTask(
                                     toUpload.values(),
                                     this::handleUploadSuccess,
-                                    this::handleUploadFailure));
+                                    this::handleUploadFailure);
                 }
-                return future;
             }
         }
+        // upload outside the synchronized block to prevent deadlock: capacity might be needed,
+        // but upload threads need to acquire lock in completion callbacks
+		if (uploadTask != null) {
+			uploader.upload(uploadTask);
+		}

Review comment:
       If we take a deep dive into the root cause, the deadlock happens due to `StateChangeUploadScheduler#upload` could hang forever and it will exit the infinite-wait once `FsStateChangelogWriter#handleUploadSuccess` could be finished. In other words, we creates a relationship between them, which is not obvious in the interface design.
   
   In other words, the `FsStateChangelogWriter` should not know the implementation of `StateChangeUploadScheduler`, and the concept of `capacity` in `StateChangeUploadScheduler` should not be visiable to the caller (the comment actualy expose such concept).
   
   From my mind, I feel like that a better solution is to let the`StateChangeUploadScheduler` itself could handle such deadlock, the `StateChangeUploadScheduler#upload` method should not be blocking and introduce another async call to be blocked for the capacity to release. Though I did not implement it, I think this is cleaner than current solution.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org