You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "XComp (via GitHub)" <gi...@apache.org> on 2023/04/12 13:29:33 UTC

[GitHub] [flink] XComp opened a new pull request, #22384: [WIP][FLINK-31776][runtime] Introduces LeaderElection sub-interface

XComp opened a new pull request, #22384:
URL: https://github.com/apache/flink/pull/22384

   PR order:
   - [ ] FLINK-30765 (PR https://github.com/apache/flink/pull/21742)
   - [ ] FLINK-31768 (PR https://github.com/apache/flink/pull/22379)
   - [ ] FLINK-31773 (PR https://github.com/apache/flink/pull/22380)
   - [ ] **this** FLINK-31776
   
   ## What is the purpose of the change
   
   This change introduces a new interface `LeaderElectionService.LeaderElection` which serves as a proxy between the `LeaderContender` and the `LeaderElectionService`. Future PRs will introduce a contender ID which can hidden in `LeaderElection`. That way, `LeaderContender` wouldn't be aware of this implementation detail of the `DefaultLeaderElectionService`
   
   ## Brief change log
   
   * TODO
   
   ## Verifying this change
   
   * Failing tests are updated
   * Additional Preconditions are added to ensure consistent behavior
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not applicable


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection interface

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on PR #22384:
URL: https://github.com/apache/flink/pull/22384#issuecomment-1560977251

   FYI: the [CI failure](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49304&view=logs&j=af184cdd-c6d8-5084-0b69-7e9c67b35f7a&t=0f3adb59-eefa-51c6-2858-3654d9e0749d&l=13641) happened due to FLINK-26635
   
   I addressed the comments and rebased the follow-up PRs based on these changes. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection interface

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on PR #22384:
URL: https://github.com/apache/flink/pull/22384#issuecomment-1557509995

   Force-push rebased this PR onto #22623


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection interface

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22384:
URL: https://github.com/apache/flink/pull/22384#discussion_r1200287477


##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java:
##########
@@ -120,6 +121,7 @@ void testConnectingAddressRetrievalWithDelayedLeaderElection() throws Exception
         long sleepingTime = 1000;
 
         LeaderElectionService leaderElectionService = null;
+        LeaderElection leaderElection = null;

Review Comment:
   You're right - it's not used for now but will be used with #22390 where we move the deregistration of the `LeaderElection` from `LeaderElectionService.stop()` into `LeaderElection.close()`. I added some documentation to reflect that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection interface

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22384:
URL: https://github.com/apache/flink/pull/22384#discussion_r1203508182


##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElection.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@code TestingLeaderElection} implements simple leader election for test cases where no {@code
+ * LeaderElectionService} is required.
+ */
+public class TestingLeaderElection implements LeaderElection {
+
+    /**
+     * Is {@code null} if the {@code LeaderElection} isn't started.
+     *
+     * @see LeaderElection#startLeaderElection(LeaderContender)
+     */
+    @Nullable private LeaderContender contender = null;
+
+    @Nullable private CompletableFuture<LeaderInformation> confirmationFuture = null;

Review Comment:
   In addition to what I mentioned in [my previous comment](https://github.com/apache/flink/pull/22384#discussion_r1202637964): This is a slight code change (comparing it to the old `TestingLeaderElectionService` where we didn't cancel the `confirmationFuture` in `TestingLeaderElectionService.stop()`. Now, we do cancel the future. But this change makes sense for the aformentioned reasons. I'm going to add a comment to document that as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection interface

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on PR #22384:
URL: https://github.com/apache/flink/pull/22384#issuecomment-1562613113

   > Therefore, it might help to wait with things that are not really pressing issues until we're done with the PRs.
   
   Fair enough. When I tried to modify the code to remove some legacy `ExternalResource`, I somehow forgot that it would conflict with FLINK-26522. I'm sorry to have hindered your progress and thanks for the reminder. 🙇 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection interface

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on PR #22384:
URL: https://github.com/apache/flink/pull/22384#issuecomment-1550869360

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection interface

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on PR #22384:
URL: https://github.com/apache/flink/pull/22384#issuecomment-1562594661

   Conflicts happened due to https://github.com/apache/flink/commit/d79f4c1d39c45ac8365fbddbe3ad7e53d42c8f41 and https://github.com/apache/flink/commit/c3409dcd0f77ef3343145ef1a05944a27f4f87ec. I rebased to the most-recent `master` and fixed the conflicts.
   
   @reswqa just keep in mind that fixing legacy issues in leader election-related code slows down the FLINK-26522 efforts (due to possible code conflicts). Therefore, it might help to wait with things that are not really pressing issues until we're done with the PRs. Feel free to jump in and help reviewing these PRs. :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection interface

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on PR #22384:
URL: https://github.com/apache/flink/pull/22384#issuecomment-1561211642

   Thanks, @zentol . I squashed the commits and rebased the PR to the most-recent `master` for a final CI run before merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection interface

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22384:
URL: https://github.com/apache/flink/pull/22384#discussion_r1203496208


##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java:
##########
@@ -32,87 +29,49 @@
  */
 public class TestingLeaderElectionService implements LeaderElectionService {
 
-    private LeaderContender contender = null;
-    private boolean hasLeadership = false;
-    private CompletableFuture<LeaderConnectionInfo> confirmationFuture = null;
-    private CompletableFuture<Void> startFuture = new CompletableFuture<>();
-    private UUID issuedLeaderSessionId = null;
-
-    /**
-     * Gets a future that completes when leadership is confirmed.
-     *
-     * <p>Note: the future is created upon calling {@link #isLeader(UUID)}.
-     */
-    public synchronized CompletableFuture<LeaderConnectionInfo> getConfirmationFuture() {
-        return confirmationFuture;
-    }
+    private final TestingLeaderElection startedLeaderElection = new TestingLeaderElection();
 
     @Override
-    public synchronized void start(LeaderContender contender) {
-        Preconditions.checkState(!getStartFuture().isDone());
-
-        this.contender = contender;
-
-        if (hasLeadership) {
-            contender.grantLeadership(issuedLeaderSessionId);
-        }
-
-        startFuture.complete(null);
+    public synchronized LeaderElection createLeaderElection() {
+        return startedLeaderElection;
     }
 
     @Override
     public synchronized void stop() throws Exception {
-        if (hasLeadership && contender != null) {
-            contender.revokeLeadership();
-        }
-
-        contender = null;
-        hasLeadership = false;
-        issuedLeaderSessionId = null;
-        startFuture.cancel(false);
-        startFuture = new CompletableFuture<>();
+        startedLeaderElection.triggerContenderCleanup();
     }
 
-    @Override
-    public synchronized void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
-        if (confirmationFuture != null) {
-            confirmationFuture.complete(new LeaderConnectionInfo(leaderSessionID, leaderAddress));
-        }
+    public synchronized CompletableFuture<LeaderInformation> isLeader(UUID leaderSessionID) {
+        return startedLeaderElection.isLeader(leaderSessionID);
     }
 
-    @Override
-    public synchronized boolean hasLeadership(@Nonnull UUID leaderSessionId) {
-        return hasLeadership && leaderSessionId.equals(issuedLeaderSessionId);
+    public synchronized LeaderInformation isConfirmedLeader(UUID leaderSessionID) {

Review Comment:
   Fair enough, I guess calling `.join()` on the `CompletableFuture` is good enough. I'm gonna add JavaDoc to the `isLeader(UUID)` method, instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection interface

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on code in PR #22384:
URL: https://github.com/apache/flink/pull/22384#discussion_r1201729146


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.UUID;
+
+/**
+ * {@code AbstractLeaderElectionService} provides a generic implementation of the {@link
+ * LeaderElection} handling.
+ */
+public abstract class AbstractLeaderElectionService implements LeaderElectionService {
+    @Override
+    public LeaderElection createLeaderElection() {
+        return new LeaderElectionImpl(this);
+    }
+
+    /**
+     * Registers the given {@link LeaderContender} with the underlying {@code
+     * LeaderElectionService}. Leadership changes are starting to be reported to the {@code
+     * LeaderContender}.
+     */
+    protected abstract void register(LeaderContender contender) throws Exception;
+
+    /** Confirms the leadership with the given session ID and address. */
+    protected abstract void confirmLeadership(UUID leaderSessionID, String leaderAddress);
+
+    /**
+     * Checks whether the {@code LeaderElectionService} has the leadership acquired for the given
+     * session ID.
+     *
+     * @return {@code true} if the service has leadership with the passed session ID acquired;
+     *     {@code false} otherwise.
+     */
+    protected abstract boolean hasLeadership(UUID leaderSessionId);
+
+    /** {@code LeaderElectionImpl} is the default implementation of {@link LeaderElection}. */
+    private static class LeaderElectionImpl implements LeaderElection {

Review Comment:
   protected is also implicitly package-private; we wouldn't need to change anything.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection interface

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22384:
URL: https://github.com/apache/flink/pull/22384#discussion_r1202068871


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.UUID;
+
+/**
+ * {@code AbstractLeaderElectionService} provides a generic implementation of the {@link
+ * LeaderElection} handling.
+ */
+public abstract class AbstractLeaderElectionService implements LeaderElectionService {
+    @Override
+    public LeaderElection createLeaderElection() {
+        return new LeaderElectionImpl(this);
+    }
+
+    /**
+     * Registers the given {@link LeaderContender} with the underlying {@code
+     * LeaderElectionService}. Leadership changes are starting to be reported to the {@code
+     * LeaderContender}.
+     */
+    protected abstract void register(LeaderContender contender) throws Exception;
+
+    /** Confirms the leadership with the given session ID and address. */
+    protected abstract void confirmLeadership(UUID leaderSessionID, String leaderAddress);
+
+    /**
+     * Checks whether the {@code LeaderElectionService} has the leadership acquired for the given
+     * session ID.
+     *
+     * @return {@code true} if the service has leadership with the passed session ID acquired;
+     *     {@code false} otherwise.
+     */
+    protected abstract boolean hasLeadership(UUID leaderSessionId);
+
+    /** {@code LeaderElectionImpl} is the default implementation of {@link LeaderElection}. */
+    private static class LeaderElectionImpl implements LeaderElection {

Review Comment:
   > and you ended up doing exactly what I asked for?...
   I'm confused by the question mark here. :innocent: I found your proposal reasonable in terms of testability and went ahead with the refactoring. My [previous comment](https://github.com/apache/flink/pull/22384#discussion_r1200710974) just explained my initial reasoning. :-D



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.UUID;
+
+/**
+ * {@code AbstractLeaderElectionService} provides a generic implementation of the {@link
+ * LeaderElection} handling.
+ */
+public abstract class AbstractLeaderElectionService implements LeaderElectionService {
+    @Override
+    public LeaderElection createLeaderElection() {
+        return new LeaderElectionImpl(this);
+    }
+
+    /**
+     * Registers the given {@link LeaderContender} with the underlying {@code
+     * LeaderElectionService}. Leadership changes are starting to be reported to the {@code
+     * LeaderContender}.
+     */
+    protected abstract void register(LeaderContender contender) throws Exception;
+
+    /** Confirms the leadership with the given session ID and address. */
+    protected abstract void confirmLeadership(UUID leaderSessionID, String leaderAddress);
+
+    /**
+     * Checks whether the {@code LeaderElectionService} has the leadership acquired for the given
+     * session ID.
+     *
+     * @return {@code true} if the service has leadership with the passed session ID acquired;
+     *     {@code false} otherwise.
+     */
+    protected abstract boolean hasLeadership(UUID leaderSessionId);
+
+    /** {@code LeaderElectionImpl} is the default implementation of {@link LeaderElection}. */
+    private static class LeaderElectionImpl implements LeaderElection {

Review Comment:
   > and you ended up doing exactly what I asked for?...
   
   I'm confused by the question mark here. :innocent: I found your proposal reasonable in terms of testability and went ahead with the refactoring. My [previous comment](https://github.com/apache/flink/pull/22384#discussion_r1200710974) just explained my initial reasoning. :-D



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection interface

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22384:
URL: https://github.com/apache/flink/pull/22384#discussion_r1200541743


##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java:
##########
@@ -120,6 +121,7 @@ void testConnectingAddressRetrievalWithDelayedLeaderElection() throws Exception
         long sleepingTime = 1000;
 
         LeaderElectionService leaderElectionService = null;
+        LeaderElection leaderElection = null;

Review Comment:
   I haven't pushed the change related to [the comment above](https://github.com/apache/flink/pull/22384#discussion_r1200287477), yet. I'm working on it. The local variable will be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection interface

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22384:
URL: https://github.com/apache/flink/pull/22384#discussion_r1200275972


##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java:
##########
@@ -86,20 +86,20 @@ public List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeH
         return super.initializeHandlers(localAddressFuture);
     }
 
-    private enum NoOpElectionService implements LeaderElectionService {
-        INSTANCE;
+    private static class NoOpElectionService extends AbstractLeaderElectionService {

Review Comment:
   Yes, we can do that. :+1: The `LeaderElectionService` implementation will go away with PR #22404 anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection interface

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22384:
URL: https://github.com/apache/flink/pull/22384#discussion_r1200126782


##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java:
##########
@@ -28,7 +28,7 @@
  * Test {@link LeaderElectionService} implementation which directly forwards isLeader and notLeader
  * calls to the contender.
  */
-public class TestingLeaderElectionService implements LeaderElectionService {
+public class TestingLeaderElectionService extends AbstractLeaderElectionService {

Review Comment:
   Fair point. I'm replacing `TestingLeaderElectionService` entirely in PR #22404 (see https://github.com/apache/flink/pull/22404/commits/620f16f114d81c5d7212ef149b142c3e7e02a7e0). But I could move the `TestingLeaderElection` implementation into this PR as well, I guess.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #22384: [WIP][FLINK-31776][runtime] Introduces LeaderElection sub-interface

Posted by "flinkbot (via GitHub)" <gi...@apache.org>.
flinkbot commented on PR #22384:
URL: https://github.com/apache/flink/pull/22384#issuecomment-1505293260

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "d283d8ed9e253af5db248dc61b4f77cb84657767",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d283d8ed9e253af5db248dc61b4f77cb84657767",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d283d8ed9e253af5db248dc61b4f77cb84657767 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection interface

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on code in PR #22384:
URL: https://github.com/apache/flink/pull/22384#discussion_r1196648554


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.UUID;
+
+/**
+ * {@code AbstractLeaderElectionService} provides a generic implementation of the {@link
+ * LeaderElection} handling.
+ */
+public abstract class AbstractLeaderElectionService implements LeaderElectionService {
+    @Override
+    public LeaderElection createLeaderElection() {
+        return new LeaderElectionImpl(this);
+    }
+
+    /**
+     * Registers the given {@link LeaderContender} with the underlying {@code
+     * LeaderElectionService}. Leadership changes are starting to be reported to the {@code
+     * LeaderContender}.
+     */
+    protected abstract void register(LeaderContender contender) throws Exception;
+
+    /** Confirms the leadership with the given session ID and address. */
+    protected abstract void confirmLeadership(UUID leaderSessionID, String leaderAddress);
+
+    /**
+     * Checks whether the {@code LeaderElectionService} has the leadership acquired for the given
+     * session ID.
+     *
+     * @return {@code true} if the service has leadership with the passed session ID acquired;
+     *     {@code false} otherwise.
+     */
+    protected abstract boolean hasLeadership(UUID leaderSessionId);
+
+    /** {@code LeaderElectionImpl} is the default implementation of {@link LeaderElection}. */
+    private static class LeaderElectionImpl implements LeaderElection {

Review Comment:
   can we move this into a dedicated class and rename it to `DefaultLeaderElection`?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java:
##########
@@ -28,7 +28,7 @@
  * Test {@link LeaderElectionService} implementation which directly forwards isLeader and notLeader
  * calls to the contender.
  */
-public class TestingLeaderElectionService implements LeaderElectionService {
+public class TestingLeaderElectionService extends AbstractLeaderElectionService {

Review Comment:
   I don't know, it seems odd that this relies on a concrete implementation.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java:
##########
@@ -86,20 +86,20 @@ public List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeH
         return super.initializeHandlers(localAddressFuture);
     }
 
-    private enum NoOpElectionService implements LeaderElectionService {
-        INSTANCE;
+    private static class NoOpElectionService extends AbstractLeaderElectionService {

Review Comment:
   Shouldn't this keep implementing `LeaderElectionService` to avoid dependencies on runtime internals?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingContender.java:
##########
@@ -41,13 +42,20 @@ public TestingContender(
         this.leaderElectionService = leaderElectionService;
     }
 
+    public LeaderElection startLeaderElection() throws Exception {
+        leaderElection = leaderElectionService.createLeaderElection();

Review Comment:
   should this maybe be set up in the constructor? Maybe even the leader election start.
   
   So far, all prod usages set this up when a component is started, and the equivalent in this class for that is the constructor.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java:
##########
@@ -120,6 +121,7 @@ void testConnectingAddressRetrievalWithDelayedLeaderElection() throws Exception
         long sleepingTime = 1000;
 
         LeaderElectionService leaderElectionService = null;
+        LeaderElection leaderElection = null;

Review Comment:
   unused?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java:
##########
@@ -147,7 +147,8 @@ private void runTestWithZooKeeperConnectionProblem(
             client.getConnectionStateListenable().addListener(connectionStateListener);
 
             final TestingContender contender = new TestingContender();
-            leaderElectionService.start(contender);
+            final LeaderElection leaderElection = leaderElectionService.createLeaderElection();

Review Comment:
   nit: variable may be unnecessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection interface

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22384:
URL: https://github.com/apache/flink/pull/22384#discussion_r1200614397


##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingContender.java:
##########
@@ -41,13 +42,20 @@ public TestingContender(
         this.leaderElectionService = leaderElectionService;
     }
 
+    public LeaderElection startLeaderElection() throws Exception {
+        leaderElection = leaderElectionService.createLeaderElection();

Review Comment:
   `LeaderElection.startLeaderElection` needs to be called outside of the contender's constructor for now. The reason for this is that we have this cyclic dependency between `DefaultLeaderElectionService` and the `MultipleComponentLeaderElectionDriverAdapter`. Registering a contender in the `MultipleComponentLeaderElectionService` through the adapter might result in an immediate leadership grant to the contender if the leadership was acquired beforehand. Calling `startLeaderElection` from within the constructor with `this` (the contender) as a parameter would result in the grant call being triggered on a not-fully-initialized instance.
   
   It's essentially the same reason why we cannot move `DefaultLeaderElectionService#startLeaderElectionBackend()` into the `DefaultLeaderElectionService` constructor. We can do this after all the other refactoring is done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection interface

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on PR #22384:
URL: https://github.com/apache/flink/pull/22384#issuecomment-1557478203

   I addressed the comments... PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection interface

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22384:
URL: https://github.com/apache/flink/pull/22384#discussion_r1203499663


##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElection.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@code TestingLeaderElection} implements simple leader election for test cases where no {@code
+ * LeaderElectionService} is required.
+ */
+public class TestingLeaderElection implements LeaderElection {
+
+    /**
+     * Is {@code null} if the {@code LeaderElection} isn't started.
+     *
+     * @see LeaderElection#startLeaderElection(LeaderContender)
+     */
+    @Nullable private LeaderContender contender = null;
+
+    @Nullable private CompletableFuture<LeaderInformation> confirmationFuture = null;

Review Comment:
   Ok, we need to stick around with the method still (because it's needed by `TestingLeaderElectionService.stop()`). The method will become `LeaderElection.close()` in FLINK-31785.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection interface

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on PR #22384:
URL: https://github.com/apache/flink/pull/22384#issuecomment-1558629372

   Force-push rebased this PR back to upstream `master` after https://github.com/apache/flink/pull/22623 was merged


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection interface

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on code in PR #22384:
URL: https://github.com/apache/flink/pull/22384#discussion_r1201735087


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.UUID;
+
+/**
+ * {@code AbstractLeaderElectionService} provides a generic implementation of the {@link
+ * LeaderElection} handling.
+ */
+public abstract class AbstractLeaderElectionService implements LeaderElectionService {
+    @Override
+    public LeaderElection createLeaderElection() {
+        return new LeaderElectionImpl(this);
+    }
+
+    /**
+     * Registers the given {@link LeaderContender} with the underlying {@code
+     * LeaderElectionService}. Leadership changes are starting to be reported to the {@code
+     * LeaderContender}.
+     */
+    protected abstract void register(LeaderContender contender) throws Exception;
+
+    /** Confirms the leadership with the given session ID and address. */
+    protected abstract void confirmLeadership(UUID leaderSessionID, String leaderAddress);
+
+    /**
+     * Checks whether the {@code LeaderElectionService} has the leadership acquired for the given
+     * session ID.
+     *
+     * @return {@code true} if the service has leadership with the passed session ID acquired;
+     *     {@code false} otherwise.
+     */
+    protected abstract boolean hasLeadership(UUID leaderSessionId);
+
+    /** {@code LeaderElectionImpl} is the default implementation of {@link LeaderElection}. */
+    private static class LeaderElectionImpl implements LeaderElection {

Review Comment:
   and you ended up doing exactly what I asked for?...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection interface

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22384:
URL: https://github.com/apache/flink/pull/22384#discussion_r1202637964


##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElection.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@code TestingLeaderElection} implements simple leader election for test cases where no {@code
+ * LeaderElectionService} is required.
+ */
+public class TestingLeaderElection implements LeaderElection {
+
+    /**
+     * Is {@code null} if the {@code LeaderElection} isn't started.
+     *
+     * @see LeaderElection#startLeaderElection(LeaderContender)
+     */
+    @Nullable private LeaderContender contender = null;
+
+    @Nullable private CompletableFuture<LeaderInformation> confirmationFuture = null;

Review Comment:
   hm, that made me start thinking. I guess, you're right: The `confirmationFuture` is bound to the `LeaderContender`. Resetting the contender should, indeed, also cause the `confirmationFuture` to be cancelled.
   
   But I did another round of digging: The `triggerContenderCleanup` was actually only exposed because of [DispatcherCleanupITCase:309](https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java#L309) where it should have been used instead of calling stop (which felt unnatural in this case because we didn't actually wanted to stop the leader election but just wanted to reset the instance).
   
   The test itself seems to be odd: We actually don't need to reset the leader election because the following code would just start a cleanup process which doesn't rely on leader election anymore. This change happened in FLINK-25432 (in https://github.com/apache/flink/commit/cc5d321d). The test wasn't properly cleaned up/refactored to reflect the new behavior. Therefore, we could just remove the leader election reset. As a consequence, there wouldn't be a need to expose the `triggerContenderCleanup`. 
   
   I'm going to provide a hotfix commit to clean the test up and revert the `triggerContenderCleanup` method exposure. The test code shouldn't be able to reset the leader election because it might make it possible to workaround badly structured code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection interface

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on code in PR #22384:
URL: https://github.com/apache/flink/pull/22384#discussion_r1201761208


##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java:
##########
@@ -32,87 +29,49 @@
  */
 public class TestingLeaderElectionService implements LeaderElectionService {
 
-    private LeaderContender contender = null;
-    private boolean hasLeadership = false;
-    private CompletableFuture<LeaderConnectionInfo> confirmationFuture = null;
-    private CompletableFuture<Void> startFuture = new CompletableFuture<>();
-    private UUID issuedLeaderSessionId = null;
-
-    /**
-     * Gets a future that completes when leadership is confirmed.
-     *
-     * <p>Note: the future is created upon calling {@link #isLeader(UUID)}.
-     */
-    public synchronized CompletableFuture<LeaderConnectionInfo> getConfirmationFuture() {
-        return confirmationFuture;
-    }
+    private final TestingLeaderElection startedLeaderElection = new TestingLeaderElection();
 
     @Override
-    public synchronized void start(LeaderContender contender) {
-        Preconditions.checkState(!getStartFuture().isDone());
-
-        this.contender = contender;
-
-        if (hasLeadership) {
-            contender.grantLeadership(issuedLeaderSessionId);
-        }
-
-        startFuture.complete(null);
+    public synchronized LeaderElection createLeaderElection() {
+        return startedLeaderElection;
     }
 
     @Override
     public synchronized void stop() throws Exception {
-        if (hasLeadership && contender != null) {
-            contender.revokeLeadership();
-        }
-
-        contender = null;
-        hasLeadership = false;
-        issuedLeaderSessionId = null;
-        startFuture.cancel(false);
-        startFuture = new CompletableFuture<>();
+        startedLeaderElection.triggerContenderCleanup();
     }
 
-    @Override
-    public synchronized void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
-        if (confirmationFuture != null) {
-            confirmationFuture.complete(new LeaderConnectionInfo(leaderSessionID, leaderAddress));
-        }
+    public synchronized CompletableFuture<LeaderInformation> isLeader(UUID leaderSessionID) {
+        return startedLeaderElection.isLeader(leaderSessionID);
     }
 
-    @Override
-    public synchronized boolean hasLeadership(@Nonnull UUID leaderSessionId) {
-        return hasLeadership && leaderSessionId.equals(issuedLeaderSessionId);
+    public synchronized LeaderInformation isConfirmedLeader(UUID leaderSessionID) {

Review Comment:
   The intention is good but I was a tad confused why isLeader and isConfirmedLeader return different types.
   
   I feel like calling `isLeader.join()` explicitly in the test, or renaming this method to `isLeaderAndWaitForConfirm` may alleviate this.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElection.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@code TestingLeaderElection} implements simple leader election for test cases where no {@code
+ * LeaderElectionService} is required.
+ */
+public class TestingLeaderElection implements LeaderElection {
+
+    /**
+     * Is {@code null} if the {@code LeaderElection} isn't started.
+     *
+     * @see LeaderElection#startLeaderElection(LeaderContender)
+     */
+    @Nullable private LeaderContender contender = null;
+
+    @Nullable private CompletableFuture<LeaderInformation> confirmationFuture = null;

Review Comment:
   wondering if we should cancel this in triggerContenderCleanup



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection interface

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on code in PR #22384:
URL: https://github.com/apache/flink/pull/22384#discussion_r1200370088


##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java:
##########
@@ -120,6 +121,7 @@ void testConnectingAddressRetrievalWithDelayedLeaderElection() throws Exception
         long sleepingTime = 1000;
 
         LeaderElectionService leaderElectionService = null;
+        LeaderElection leaderElection = null;

Review Comment:
   but why add the field now instead of in the other PR? :thinking: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection interface

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22384:
URL: https://github.com/apache/flink/pull/22384#discussion_r1200710974


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.UUID;
+
+/**
+ * {@code AbstractLeaderElectionService} provides a generic implementation of the {@link
+ * LeaderElection} handling.
+ */
+public abstract class AbstractLeaderElectionService implements LeaderElectionService {
+    @Override
+    public LeaderElection createLeaderElection() {
+        return new LeaderElectionImpl(this);
+    }
+
+    /**
+     * Registers the given {@link LeaderContender} with the underlying {@code
+     * LeaderElectionService}. Leadership changes are starting to be reported to the {@code
+     * LeaderContender}.
+     */
+    protected abstract void register(LeaderContender contender) throws Exception;
+
+    /** Confirms the leadership with the given session ID and address. */
+    protected abstract void confirmLeadership(UUID leaderSessionID, String leaderAddress);
+
+    /**
+     * Checks whether the {@code LeaderElectionService} has the leadership acquired for the given
+     * session ID.
+     *
+     * @return {@code true} if the service has leadership with the passed session ID acquired;
+     *     {@code false} otherwise.
+     */
+    protected abstract boolean hasLeadership(UUID leaderSessionId);
+
+    /** {@code LeaderElectionImpl} is the default implementation of {@link LeaderElection}. */
+    private static class LeaderElectionImpl implements LeaderElection {

Review Comment:
   The idea of having the `LeaderElectionImpl` being implemented as an inner class was to hide the `LeaderElection`-specific interface from other implementations. The `StandaloneLeaderElectionService` would require these methods to become package-private. The `EmbeddedLeaderElectionService` is not even in the same package. But I noticed that I could achieve the same with a normal class. This improves testability later on, I guess. I created `DefaultLeaderElectionTest` with quite basic test methods for now. I hope to gradually extend it in the next (future) PRs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp merged pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection interface

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp merged PR #22384:
URL: https://github.com/apache/flink/pull/22384


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection sub-interface

Posted by "XComp (via GitHub)" <gi...@apache.org>.
XComp commented on code in PR #22384:
URL: https://github.com/apache/flink/pull/22384#discussion_r1168749109


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java:
##########
@@ -53,24 +50,35 @@ public interface LeaderElectionService {
     void stop() throws Exception;
 
     /**
-     * Confirms that the {@link LeaderContender} has accepted the leadership identified by the given
-     * leader session id. It also publishes the leader address under which the leader is reachable.
-     *
-     * <p>The rational behind this method is to establish an order between setting the new leader
-     * session ID in the {@link LeaderContender} and publishing the new leader session ID as well as
-     * the leader address to the leader retrieval services.
-     *
-     * @param leaderSessionID The new leader session ID
-     * @param leaderAddress The address of the new leader
+     * {@code LeaderElection} serves as a proxy between {@code LeaderElectionService} and {@link
+     * LeaderContender}.
      */
-    void confirmLeadership(UUID leaderSessionID, String leaderAddress);
+    interface LeaderElection {

Review Comment:
   Working on the follow-up PRs (more specifically: https://github.com/apache/flink/pull/22404)  properly (especially refactoring all the tests), I start to believe that having this as a sub-interface isn't the best choice. `StandaloneLeaderElectionService` will be replaced by a `StandaloneLeaderElection` implementation. The test cases which rely on a `LeaderElection` do not necessarily rely on the `LeaderElectionService`, either. Having them separate and not that closely-coupled feels like a more reasonable thing to do. :thinking: WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #22384: [FLINK-31776][runtime] Introduces LeaderElection sub-interface

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on code in PR #22384:
URL: https://github.com/apache/flink/pull/22384#discussion_r1170045731


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java:
##########
@@ -53,24 +50,35 @@ public interface LeaderElectionService {
     void stop() throws Exception;
 
     /**
-     * Confirms that the {@link LeaderContender} has accepted the leadership identified by the given
-     * leader session id. It also publishes the leader address under which the leader is reachable.
-     *
-     * <p>The rational behind this method is to establish an order between setting the new leader
-     * session ID in the {@link LeaderContender} and publishing the new leader session ID as well as
-     * the leader address to the leader retrieval services.
-     *
-     * @param leaderSessionID The new leader session ID
-     * @param leaderAddress The address of the new leader
+     * {@code LeaderElection} serves as a proxy between {@code LeaderElectionService} and {@link
+     * LeaderContender}.
      */
-    void confirmLeadership(UUID leaderSessionID, String leaderAddress);
+    interface LeaderElection {

Review Comment:
   Cant hurt to separate them :shrug:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org