You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/07 13:02:40 UTC

[GitHub] [kafka] dengziming commented on a diff in pull request #12961: KAFKA-14446: API forwarding support from zkBrokers to the Controller

dengziming commented on code in PR #12961:
URL: https://github.com/apache/kafka/pull/12961#discussion_r1042151200


##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -1665,6 +1665,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
   def usesSelfManagedQuorum: Boolean = processRoles.nonEmpty
 
   val migrationEnabled: Boolean = getBoolean(KafkaConfig.MigrationEnabledProp)
+  def enableZkApiForwarding: Boolean = migrationEnabled && interBrokerProtocolVersion.isApiForwardingSupported

Review Comment:
   Why should we check `migrationEnabled` here? does this mean we will not enable forward in zk mode except for migration?



##########
clients/src/main/java/org/apache/kafka/common/requests/AbstractControlRequest.java:
##########
@@ -43,6 +49,8 @@ protected AbstractControlRequest(ApiKeys api, short version) {
 
     public abstract int controllerId();
 
+    public abstract int kraftControllerId();

Review Comment:
   I didn't see how is this field used in `KafkaApis`.



##########
core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala:
##########
@@ -148,13 +149,15 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk"))
+  @ValueSource(strings = Array("zk", "zkMigration"))
   def testNotController(quorum: String): Unit = {
     // Note: we don't run this test when in KRaft mode, because KRaft doesn't have this
     // behavior of returning NOT_CONTROLLER. Instead, the request is forwarded.
     val req = topicsReq(Seq(topicReq("topic1")))
     val response = sendCreateTopicRequest(req, notControllerSocketServer)
-    assertEquals(1, response.errorCounts().get(Errors.NOT_CONTROLLER))
+    response.data().topics().asScala.foreach(topic => topic.errorCode())

Review Comment:
   This line is unused.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org