You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "reswqa (via GitHub)" <gi...@apache.org> on 2023/04/19 13:30:41 UTC

[GitHub] [flink] reswqa commented on a diff in pull request #22422: [FLINK-31838][runtime] Moves thread handling for leader event listener calls from DefaultMultipleComponentLeaderElectionService into DefaultLeaderElectionService

reswqa commented on code in PR #22422:
URL: https://github.com/apache/flink/pull/22422#discussion_r1171309501


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionEventHandler.java:
##########
@@ -36,14 +37,44 @@ public interface LeaderElectionEventHandler {
      * Called by specific {@link LeaderElectionDriver} when the leadership is granted.
      *
      * @param newLeaderSessionId the valid leader session id
+     * @param executorService the {@link ExecutorService} this call should be executed on.
      */
-    void onGrantLeadership(UUID newLeaderSessionId);
+    void onGrantLeadershipAsync(UUID newLeaderSessionId, ExecutorService executorService);
+
+    /**
+     * Called by specific {@link LeaderElectionDriver} when the leadership is granted.
+     *
+     * @param newLeaderSessionId the valid leader session id
+     */
+    void onGrantLeadershipAsync(UUID newLeaderSessionId);

Review Comment:
   I'm a bit confused, why is this method named with async suffix? I know this is to make `DefaultMultipleComponentLeaderElectionService` use the executor in `DefaultLeaderElectionService`, but from the protocol of the interface, it's a bit difficult to understand. At least we can emphasize in Java doc that this method must not be executed on the caller thread.
   
   The following is the doc for the `thenRunAsync` method of `CompletionStage`, which looks much more intuitive.
   ```
       /*
        * Returns a new CompletionStage that, when this stage completes
        * normally, executes the given action using this stage's default
        * asynchronous execution facility.
        */
       public CompletionStage<Void> thenRunAsync(Runnable action);
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -67,7 +72,20 @@
     // this.running=true ensures that leaderContender != null
     private LeaderElectionDriver leaderElectionDriver;
 
+    @GuardedBy("lock")
+    private final ExecutorService leadershipOperationExecutor;

Review Comment:
   Why this field is guarded by lock?



##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver.java:
##########
@@ -195,12 +196,14 @@ private class LeaderCallbackHandlerImpl extends KubernetesLeaderElector.LeaderCa
 
         @Override
         public void isLeader() {
-            leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID());
+            leaderElectionEventHandler.onGrantLeadershipAsync(
+                    UUID.randomUUID(), Executors.newDirectExecutorService());

Review Comment:
   Can `Executors.directExecutor()` be used here to replace `Executors.newDirectExecutorService()` 🤔 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -67,7 +72,20 @@
     // this.running=true ensures that leaderContender != null
     private LeaderElectionDriver leaderElectionDriver;
 
+    @GuardedBy("lock")
+    private final ExecutorService leadershipOperationExecutor;
+
+    @VisibleForTesting

Review Comment:
   Why does it need to be marked as `VisibleForTesting` here, Isn't there still a lot of external production code calling it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org