You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/23 23:25:57 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #11109: KAFKA-13113: Support unregistering Raft listeners

hachikuji commented on a change in pull request #11109:
URL: https://github.com/apache/kafka/pull/11109#discussion_r675907789



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2381,25 +2396,51 @@ public void complete() {
         }
     }
 
-    private final class ListenerContext implements CloseListener<BatchReader<T>> {
+    private static enum RegistrationOps {
+        REGISTER, UNREGISTER
+    }
+
+    private final class Registration {
+        private final RegistrationOps ops;
+        private final Listener<T> listener;
+
+        private Registration(RegistrationOps ops, Listener<T> listener) {
+            this.ops = ops;
+            this.listener = listener;
+        }
+
+        private void update(Map<Listener<T>, KafkaListenerContext> contexts) {
+            if (ops == RegistrationOps.REGISTER) {
+                if (contexts.putIfAbsent(listener, new KafkaListenerContext(listener)) != null) {
+                    logger.error("Attempting to add a listener that already exists: {}", listenerName(listener));
+                }
+            } else {
+                if (contexts.remove(listener) == null) {
+                    logger.error("Attempting to remove a listener context that doesn't exists: {}", listenerName(listener));
+                }
+            }
+        }
+    }
+
+    private final class KafkaListenerContext implements CloseListener<BatchReader<T>> {

Review comment:
       nit: is the `Kafka` prefix adding much value here?

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2381,25 +2396,51 @@ public void complete() {
         }
     }
 
-    private final class ListenerContext implements CloseListener<BatchReader<T>> {
+    private static enum RegistrationOps {
+        REGISTER, UNREGISTER
+    }
+
+    private final class Registration {
+        private final RegistrationOps ops;
+        private final Listener<T> listener;
+
+        private Registration(RegistrationOps ops, Listener<T> listener) {
+            this.ops = ops;
+            this.listener = listener;
+        }
+
+        private void update(Map<Listener<T>, KafkaListenerContext> contexts) {
+            if (ops == RegistrationOps.REGISTER) {
+                if (contexts.putIfAbsent(listener, new KafkaListenerContext(listener)) != null) {

Review comment:
       Could we have an INFO message for the happy paths in here?

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2381,25 +2396,51 @@ public void complete() {
         }
     }
 
-    private final class ListenerContext implements CloseListener<BatchReader<T>> {
+    private static enum RegistrationOps {
+        REGISTER, UNREGISTER
+    }
+
+    private final class Registration {
+        private final RegistrationOps ops;
+        private final Listener<T> listener;
+
+        private Registration(RegistrationOps ops, Listener<T> listener) {
+            this.ops = ops;
+            this.listener = listener;
+        }
+
+        private void update(Map<Listener<T>, KafkaListenerContext> contexts) {

Review comment:
       nit: this feels a tad unnatural. We already have an implicit reference to `listenerContexts`. Could we move this method into `KafkaRaftClient` and make `Registration` a static class?

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##########
@@ -89,15 +89,32 @@ default void beginShutdown() {}
     void initialize();
 
     /**
-     * Register a listener to get commit/leader notifications.
+     * Register a listener to get commit, snapshot and leader notifications.
      *
-     * @param listener the listener
+     * The implementation of this interface assumes that each call to {@code register} uses
+     * a different {@code Listener} instance. If the same instance is used for multiple calls
+     * to this method, then only one {@code Listener} will be registered.
+     *
+     * @param listener the listener to register
      */
     void register(Listener<T> listener);
 
+    /**
+     * Unregisters a listener.
+     *
+     * To distinguish from events that happend before the call to {@code unregister} and a future
+     * call to {@code register}, then different {@code Listener} instances must be used.

Review comment:
       nit: remove "then"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org