You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/22 17:26:48 UTC

[GitHub] [kafka] jsancio opened a new pull request #11109: KAFKA-13113: Support unregistering Raft listeners

jsancio opened a new pull request #11109:
URL: https://github.com/apache/kafka/pull/11109


   Support unregistering by returning a ListenerContext on registration and exposing a close method on the returned ListenerContext. To allow the user to use the same Listener on different registrations the associated ListenerContext is sent through all of the methods described by the Raft Listener.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11109: KAFKA-13113: Support unregistering Raft listeners

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11109:
URL: https://github.com/apache/kafka/pull/11109#discussion_r675065535



##########
File path: metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
##########
@@ -362,24 +363,27 @@ LeaderAndEpoch notifiedLeader() {
         }
 
         void handleCommit(MemoryBatchReader<ApiMessageAndVersion> reader) {
-            listener.handleCommit(reader);
+            listener.handleCommit(this, reader);
             offset = reader.lastOffset().getAsLong();
         }
 
         void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
-            listener.handleSnapshot(reader);
+            listener.handleSnapshot(this, reader);
             offset = reader.lastContainedLogOffset();
         }
 
         void handleLeaderChange(long offset, LeaderAndEpoch leader) {
-            listener.handleLeaderChange(leader);
+            listener.handleLeaderChange(this, leader);
             notifiedLeader = leader;
             this.offset = offset;
         }
 
         void beginShutdown() {
-            listener.beginShutdown();
+            listener.beginShutdown(this);
         }
+
+        @Override
+        public void close() {}

Review comment:
       Should fix this by appending an event to eventQueue that removes this listener.

##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -2491,6 +2492,47 @@ public void testLateRegisteredListenerCatchesUp() throws Exception {
         assertEquals(9L, secondListener.claimedEpochStartOffset(epoch));
     }
 
+    @Test
+    public void testReregistrationChangesListenerContext() throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        List<String> batch1 = Arrays.asList("1", "2", "3");
+        List<String> batch2 = Arrays.asList("4", "5", "6");
+        List<String> batch3 = Arrays.asList("7", "8", "9");
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+            .appendToLog(1, batch1)
+            .appendToLog(1, batch2)
+            .appendToLog(2, batch3)
+            .withUnknownLeader(epoch - 1)
+            .build();
+
+        context.becomeLeader();
+        context.client.poll();
+        assertEquals(10L, context.log.endOffset().offset);
+
+        // Let the initial listener catch up
+        context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 10L, epoch, 0));

Review comment:
       Use the helper method "advance high-watermark" in a few of these places.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #11109: KAFKA-13113: Support unregistering Raft listeners

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #11109:
URL: https://github.com/apache/kafka/pull/11109


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11109: KAFKA-13113: Support unregistering Raft listeners

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11109:
URL: https://github.com/apache/kafka/pull/11109#discussion_r675911600



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2381,25 +2396,51 @@ public void complete() {
         }
     }
 
-    private final class ListenerContext implements CloseListener<BatchReader<T>> {
+    private static enum RegistrationOps {
+        REGISTER, UNREGISTER
+    }
+
+    private final class Registration {
+        private final RegistrationOps ops;
+        private final Listener<T> listener;
+
+        private Registration(RegistrationOps ops, Listener<T> listener) {
+            this.ops = ops;
+            this.listener = listener;
+        }
+
+        private void update(Map<Listener<T>, KafkaListenerContext> contexts) {
+            if (ops == RegistrationOps.REGISTER) {
+                if (contexts.putIfAbsent(listener, new KafkaListenerContext(listener)) != null) {

Review comment:
       Done.

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##########
@@ -89,15 +89,32 @@ default void beginShutdown() {}
     void initialize();
 
     /**
-     * Register a listener to get commit/leader notifications.
+     * Register a listener to get commit, snapshot and leader notifications.
      *
-     * @param listener the listener
+     * The implementation of this interface assumes that each call to {@code register} uses
+     * a different {@code Listener} instance. If the same instance is used for multiple calls
+     * to this method, then only one {@code Listener} will be registered.
+     *
+     * @param listener the listener to register
      */
     void register(Listener<T> listener);
 
+    /**
+     * Unregisters a listener.
+     *
+     * To distinguish from events that happend before the call to {@code unregister} and a future
+     * call to {@code register}, then different {@code Listener} instances must be used.

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #11109: KAFKA-13113: Support unregistering Raft listeners

Posted by GitBox <gi...@apache.org>.
jsancio commented on pull request #11109:
URL: https://github.com/apache/kafka/pull/11109#issuecomment-885800314


   > I don't see why we need this context object. The listener itself is the context object. It should be able to unregister itself by just calling `RaftClient#unregister(this)`
   > 
   > You need a context object in non-object-oriented languages like C, where you can't call `Foo#bar(a, b)`, but you have to call `bar(Foo foo, a, b)` Since Java is object-oriented, the context is in `Foo` already, so no need for a helper object.
   
   Yeah. At a high-level we need to support two requirements:
   1. Unregistering a Listener.
   2. The Controller needs to know if a given "handleCommit", "handleSnapshot" or "handleLeaderChange" is for the previous registration or the current registration.
   
   With the implementation in this PR the controller can do the following to determine if those events are for the current registration or the previous registration:
   https://github.com/apache/kafka/pull/11116/files#diff-77dc2adb187fd078084644613cff2b53021c8a5fbcdcfa116515734609d1332a
   
   If we don't expose the `ListenerContext` then the Controller needs to use a different `Listener` instance in the new registration so that it can determine if the "handleCommit", "handleSnapshot" or "handleLeaderChange" are for the current registration. With this other solution it is up to the user (Controller) to guarantee this and the RaftClient cannot help with this check.
   
   I am okay with either solution as there are pros and cons to both.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11109: KAFKA-13113: Support unregistering Raft listeners

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11109:
URL: https://github.com/apache/kafka/pull/11109#discussion_r675907789



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2381,25 +2396,51 @@ public void complete() {
         }
     }
 
-    private final class ListenerContext implements CloseListener<BatchReader<T>> {
+    private static enum RegistrationOps {
+        REGISTER, UNREGISTER
+    }
+
+    private final class Registration {
+        private final RegistrationOps ops;
+        private final Listener<T> listener;
+
+        private Registration(RegistrationOps ops, Listener<T> listener) {
+            this.ops = ops;
+            this.listener = listener;
+        }
+
+        private void update(Map<Listener<T>, KafkaListenerContext> contexts) {
+            if (ops == RegistrationOps.REGISTER) {
+                if (contexts.putIfAbsent(listener, new KafkaListenerContext(listener)) != null) {
+                    logger.error("Attempting to add a listener that already exists: {}", listenerName(listener));
+                }
+            } else {
+                if (contexts.remove(listener) == null) {
+                    logger.error("Attempting to remove a listener context that doesn't exists: {}", listenerName(listener));
+                }
+            }
+        }
+    }
+
+    private final class KafkaListenerContext implements CloseListener<BatchReader<T>> {

Review comment:
       nit: is the `Kafka` prefix adding much value here?

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2381,25 +2396,51 @@ public void complete() {
         }
     }
 
-    private final class ListenerContext implements CloseListener<BatchReader<T>> {
+    private static enum RegistrationOps {
+        REGISTER, UNREGISTER
+    }
+
+    private final class Registration {
+        private final RegistrationOps ops;
+        private final Listener<T> listener;
+
+        private Registration(RegistrationOps ops, Listener<T> listener) {
+            this.ops = ops;
+            this.listener = listener;
+        }
+
+        private void update(Map<Listener<T>, KafkaListenerContext> contexts) {
+            if (ops == RegistrationOps.REGISTER) {
+                if (contexts.putIfAbsent(listener, new KafkaListenerContext(listener)) != null) {

Review comment:
       Could we have an INFO message for the happy paths in here?

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2381,25 +2396,51 @@ public void complete() {
         }
     }
 
-    private final class ListenerContext implements CloseListener<BatchReader<T>> {
+    private static enum RegistrationOps {
+        REGISTER, UNREGISTER
+    }
+
+    private final class Registration {
+        private final RegistrationOps ops;
+        private final Listener<T> listener;
+
+        private Registration(RegistrationOps ops, Listener<T> listener) {
+            this.ops = ops;
+            this.listener = listener;
+        }
+
+        private void update(Map<Listener<T>, KafkaListenerContext> contexts) {

Review comment:
       nit: this feels a tad unnatural. We already have an implicit reference to `listenerContexts`. Could we move this method into `KafkaRaftClient` and make `Registration` a static class?

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##########
@@ -89,15 +89,32 @@ default void beginShutdown() {}
     void initialize();
 
     /**
-     * Register a listener to get commit/leader notifications.
+     * Register a listener to get commit, snapshot and leader notifications.
      *
-     * @param listener the listener
+     * The implementation of this interface assumes that each call to {@code register} uses
+     * a different {@code Listener} instance. If the same instance is used for multiple calls
+     * to this method, then only one {@code Listener} will be registered.
+     *
+     * @param listener the listener to register
      */
     void register(Listener<T> listener);
 
+    /**
+     * Unregisters a listener.
+     *
+     * To distinguish from events that happend before the call to {@code unregister} and a future
+     * call to {@code register}, then different {@code Listener} instances must be used.

Review comment:
       nit: remove "then"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11109: KAFKA-13113: Support unregistering Raft listeners

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11109:
URL: https://github.com/apache/kafka/pull/11109#discussion_r675915832



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2381,25 +2396,51 @@ public void complete() {
         }
     }
 
-    private final class ListenerContext implements CloseListener<BatchReader<T>> {
+    private static enum RegistrationOps {
+        REGISTER, UNREGISTER
+    }
+
+    private final class Registration {
+        private final RegistrationOps ops;
+        private final Listener<T> listener;
+
+        private Registration(RegistrationOps ops, Listener<T> listener) {
+            this.ops = ops;
+            this.listener = listener;
+        }
+
+        private void update(Map<Listener<T>, KafkaListenerContext> contexts) {
+            if (ops == RegistrationOps.REGISTER) {
+                if (contexts.putIfAbsent(listener, new KafkaListenerContext(listener)) != null) {
+                    logger.error("Attempting to add a listener that already exists: {}", listenerName(listener));
+                }
+            } else {
+                if (contexts.remove(listener) == null) {
+                    logger.error("Attempting to remove a listener context that doesn't exists: {}", listenerName(listener));
+                }
+            }
+        }
+    }
+
+    private final class KafkaListenerContext implements CloseListener<BatchReader<T>> {

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11109: KAFKA-13113: Support unregistering Raft listeners

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11109:
URL: https://github.com/apache/kafka/pull/11109#discussion_r675914887



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2381,25 +2396,51 @@ public void complete() {
         }
     }
 
-    private final class ListenerContext implements CloseListener<BatchReader<T>> {
+    private static enum RegistrationOps {
+        REGISTER, UNREGISTER
+    }
+
+    private final class Registration {
+        private final RegistrationOps ops;
+        private final Listener<T> listener;
+
+        private Registration(RegistrationOps ops, Listener<T> listener) {
+            this.ops = ops;
+            this.listener = listener;
+        }
+
+        private void update(Map<Listener<T>, KafkaListenerContext> contexts) {

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #11109: KAFKA-13113: Support unregistering Raft listeners

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #11109:
URL: https://github.com/apache/kafka/pull/11109#issuecomment-885802283


   Thanks for the explanation. I'd prefer just using a different listener instance. Should be straightforward for the controller + broker to do that since they use small helper objects for this anyway.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #11109: KAFKA-13113: Support unregistering Raft listeners

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #11109:
URL: https://github.com/apache/kafka/pull/11109#issuecomment-885788949


   I don't see why we need this context object. The listener itself is the context object. It should be able to unregister itself by just calling `RaftClient#unregister(this)`
   
   You need a context object in non-object-oriented languages like C, where you can't call `Foo#bar(a, b)`, but you have to call `bar(Foo foo, a, b)` Since Java is object-oriented, the context is in `Foo` already, so no need for a helper object.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org