You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/05/09 22:05:24 UTC

[GitHub] [kafka] hachikuji commented on a diff in pull request #11969: KAFKA-13649: Implement early.start.listeners and fix StandardAuthorizer loading

hachikuji commented on code in PR #11969:
URL: https://github.com/apache/kafka/pull/11969#discussion_r868426504


##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -1486,13 +1486,16 @@ class KafkaConfigTest {
     assertEquals("3", originals.get(KafkaConfig.NodeIdProp))
   }
 
+  val kraftProps = new Properties()

Review Comment:
   nit: it would be a little nicer to make this a def. We don't need the properties after they are initialized.



##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -1509,4 +1512,29 @@ class KafkaConfigTest {
     assertNotNull(config.getLong(KafkaConfig.SaslOAuthBearerJwksEndpointRetryBackoffMsProp))
     assertNotNull(config.getLong(KafkaConfig.SaslOAuthBearerJwksEndpointRetryBackoffMaxMsProp))
   }
+
+  @Test

Review Comment:
   May be useful testing that the default uses the controller listeners?
   
   As a side comment, I wonder if the default of the controller listeners should only apply when the standard authorizer is in use? Or perhaps a warning of some kind if we have a non-standard authorizer and `early.start.listeners` is not set.



##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##########
@@ -78,6 +77,24 @@ public AclMutator aclMutatorOrException() {
         return aclMutator;
     }
 
+    @Override
+    public synchronized void completeInitialLoad() {
+        data = data.copyWithNewLoadingComplete(true);
+        data.log.info("Completed initial ACL load process.");
+        initialLoadFuture.complete(null);
+    }
+
+    // Visible for testing

Review Comment:
   nit: can probably make it default access?



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -921,12 +925,35 @@ private void appendRaftEvent(String name, Runnable runnable) {
                 if (this != metaLogListener) {
                     log.debug("Ignoring {} raft event from an old registration", name);
                 } else {
-                    runnable.run();
+                    try {
+                        runnable.run();
+                    } finally {
+                        maybeCompleteAuthorizerInitialLoad();
+                    }
                 }
             });
         }
     }
 
+    private void maybeCompleteAuthorizerInitialLoad() {
+        if (!needToCompleteAuthorizerLoad) return;
+        OptionalLong highWatermark = raftClient.highWatermark();
+        if (highWatermark.isPresent()) {
+            if (lastCommittedOffset + 1 >= highWatermark.getAsLong()) {
+                log.debug("maybeCompleteAuthorizerInitialLoad: completing authorizer " +

Review Comment:
   I think it would be reasonable to have an INFO message about this event. It only happens once and I suspect it will be useful to know when it completed.



##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##########
@@ -251,6 +273,9 @@ public AuthorizationResult authorize(
         if (superUsers.contains(principal.toString())) {
             rule = SuperUserRule.INSTANCE;
         } else {
+            if (!loadingComplete) {

Review Comment:
   nit: maybe pull this up one level?
   ```
   } else if (!loadingComplete) {
     throw new AuthorizerNotReadyException();
   } else {
   ...



##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -725,7 +727,11 @@ object KafkaConfig {
 
   /************* Authorizer Configuration ***********/
   val AuthorizerClassNameDoc = s"The fully qualified name of a class that implements <code>${classOf[Authorizer].getName}</code>" +
-  " interface, which is used by the broker for authorization."
+    " interface, which is used by the broker for authorization."
+  val EarlyStartListenersDoc = "A comma-separated list of listener names which should be started before any non-early start listeners. " +

Review Comment:
   nit: the first sentence is seems circular. I think it would be helpful to mention the authorizer use case. Also, can we say something about the default?



##########
core/src/main/scala/kafka/network/SocketServer.scala:
##########
@@ -1864,6 +1780,18 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
       sensor
     }
   }
+
+  /**
+   * Close `channel` and decrement the connection count.
+   */
+  def closeChannel(listenerName: ListenerName, channel: SocketChannel): Unit = {
+    if (channel != null) {
+      debug(s"Closing connection from ${channel.socket.getRemoteSocketAddress}")
+      dec(listenerName, channel.socket.getInetAddress)
+      closeSocket(channel, this)

Review Comment:
   Yes, I'm not disputing the need to register the close event with `ConnectionQuotas`. But I think it is a strange inversion of responsibility to have it close the socket as well. The `Processor` and `Acceptor` classes are the ones that own the socket lifecycle, while the `ConnectionQuotas` instance is just a parameter. The original implementation was more intuitive. 
   
   It is admittedly a small thing, but I do think the logging issue matters. When we are debugging events in system test failures, for example, grepping the logger is a common strategy. It is surprising to need to include the connection quota object to see all of the close events.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org