You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/28 14:29:40 UTC

[GitHub] [kafka] ueisele opened a new pull request #10935: KAFKA-13003: In kraft mode also advertise configured advertised port instead of socket port

ueisele opened a new pull request #10935:
URL: https://github.com/apache/kafka/pull/10935


   In Kraft mode Apache Kafka 2.8.0 does advertise the socket port instead of the configured advertised port.
   
   A broker given with the following configuration
   ```
   listeners=PUBLIC://0.0.0.0:19092,REPLICATION://0.0.0.0:9091
   advertised.listeners=PUBLIC://envoy-kafka-broker:9091,REPLICATION://kafka-broker1:9091
   ```
   advertises on the _PUBLIC_ listener _envoy-kafka-broker:19092_, however I would expect that _envoy-kafka-broker:9091_ is advertised. In ZooKeeper mode it works as expected.
   
   The reason is that in the BrokerServer at the moment the socket server port is used for registration at the controller: https://github.com/apache/kafka/blob/2beaf9a720330615bc5474ec079f8b4b105eff91/core/src/main/scala/kafka/server/BrokerServer.scala#L286
   
   In KafkaServer class which is used in ZooKeeper mode the configured advertised port is used: https://github.com/apache/kafka/blob/2beaf9a720330615bc5474ec079f8b4b105eff91/core/src/main/scala/kafka/server/KafkaServer.scala#L462
   
   I changed the BrokerServer class, so that in Kraft mode like in ZooKeeper mode also the configured advertised port is registered.
   
   I manually tested it with a Docker-Compose setup. It basically runs 3 Kafka Broker with Apache Kafka 2.8 in Kraft mode and an Envoy proxy in front of them. With Apache Kafka 2.8.0 it does not work, because Kafka does not advertise the configured advertised port. For more details about the setup see: https://github.com/ueisele/kafka/tree/fix/kraft-advertisedlisteners-build/proxy-examples/proxyl4-kafkakraft-bug-2.8
   
   The same Docker-Compose setup with the fix (proposed in this pull request) works and advertises the configured advertised port. For more details see: https://github.com/ueisele/kafka/tree/fix/kraft-advertisedlisteners-build/proxy-examples/proxyl4-kafkakraft-fix-2.8
   
   At the moment there is no dedicated test for BrokerServer class. Therefore I did not create a test so far. Where such a test should be added? Is https://github.com/apache/kafka/blob/trunk/core/src/test/scala/integration/kafka/server/RaftClusterTest.scala a good place?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #10935: KAFKA-13003: In kraft mode also advertise configured advertised port instead of socket port

Posted by GitBox <gi...@apache.org>.
cmccabe merged pull request #10935:
URL: https://github.com/apache/kafka/pull/10935


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10935: KAFKA-13003: In kraft mode also advertise configured advertised port instead of socket port

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10935:
URL: https://github.com/apache/kafka/pull/10935#discussion_r659855125



##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -283,7 +283,7 @@ class BrokerServer(
         networkListeners.add(new Listener().
           setHost(if (Utils.isBlank(ep.host)) InetAddress.getLocalHost.getCanonicalHostName else ep.host).
           setName(ep.listenerName.value()).
-          setPort(socketServer.boundPort(ep.listenerName)).
+          setPort(ep.port).

Review comment:
       Thanks for the PR. Can you please add a test?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ueisele commented on a change in pull request #10935: KAFKA-13003: In kraft mode also advertise configured advertised port instead of socket port

Posted by GitBox <gi...@apache.org>.
ueisele commented on a change in pull request #10935:
URL: https://github.com/apache/kafka/pull/10935#discussion_r667498348



##########
File path: core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
##########
@@ -313,4 +318,94 @@ class RaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testCreateClusterWithAdvertisedPortZero(): Unit = {
+    val nodes = new TestKitNodes.Builder()
+      .setNumControllerNodes(1)
+      .setNumBrokerNodes(3)
+      .build()
+    nodes.brokerNodes().values().forEach { broker =>
+      broker.propertyOverrides().put(KafkaConfig.ListenersProp,
+        s"${nodes.externalListenerName().value()}://localhost:0")
+      broker.propertyOverrides().put(KafkaConfig.AdvertisedListenersProp,
+        s"${nodes.externalListenerName().value()}://localhost:0")
+    }
+    val cluster = new KafkaClusterTestKit.Builder(nodes).build()
+    try {
+      cluster.format()
+      cluster.startup()
+
+      val (runningBrokerServer, foundRunningBroker) = TestUtils.computeUntilTrue(anyBrokerServer(cluster)){
+        brokerServer => brokerServer.currentState() == BrokerState.RUNNING
+      }
+      assertTrue(foundRunningBroker, "No Broker never made it to RUNNING state.")
+
+      val (describeClusterResponse, metadataUpToDate) = TestUtils.computeUntilTrue(
+        sendDescribeClusterRequest(runningBrokerServer.socketServer, nodes.externalListenerName())
+      ) {
+        response => response.nodes().size() == nodes.brokerNodes().size()
+      }
+      assertTrue(metadataUpToDate, s"Broker never reached expected cluster size of ${nodes.brokerNodes().size()}")

Review comment:
       I refactored the two tests and also considered your other two remarks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10935: KAFKA-13003: In kraft mode also advertise configured advertised port instead of socket port

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10935:
URL: https://github.com/apache/kafka/pull/10935#discussion_r659855125



##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -283,7 +283,7 @@ class BrokerServer(
         networkListeners.add(new Listener().
           setHost(if (Utils.isBlank(ep.host)) InetAddress.getLocalHost.getCanonicalHostName else ep.host).
           setName(ep.listenerName.value()).
-          setPort(socketServer.boundPort(ep.listenerName)).
+          setPort(ep.port).

Review comment:
       Thanks for the PR. Can you please add a test?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #10935: KAFKA-13003: In kraft mode also advertise configured advertised port instead of socket port

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #10935:
URL: https://github.com/apache/kafka/pull/10935#issuecomment-872330571


   @ueisele quick heads up that most committers are pretty busy this week due to the 3.0 feature freeze. We should be able to review and merge this next week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ueisele commented on a change in pull request #10935: KAFKA-13003: In kraft mode also advertise configured advertised port instead of socket port

Posted by GitBox <gi...@apache.org>.
ueisele commented on a change in pull request #10935:
URL: https://github.com/apache/kafka/pull/10935#discussion_r661744976



##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -283,7 +283,7 @@ class BrokerServer(
         networkListeners.add(new Listener().
           setHost(if (Utils.isBlank(ep.host)) InetAddress.getLocalHost.getCanonicalHostName else ep.host).
           setName(ep.listenerName.value()).
-          setPort(socketServer.boundPort(ep.listenerName)).
+          setPort(ep.port).

Review comment:
       I added two tests to RaftClusterTest class. One for verifying that the configured advertised host and port are registered if the port is non zero. And another which in verifies that if the advertised port is 0, the bound port is used as actual advertised port. This is required to support binding the listener to any free port, which is used for example for the integration tests. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ueisele commented on a change in pull request #10935: KAFKA-13003: In kraft mode also advertise configured advertised port instead of socket port

Posted by GitBox <gi...@apache.org>.
ueisele commented on a change in pull request #10935:
URL: https://github.com/apache/kafka/pull/10935#discussion_r660063967



##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -283,7 +283,7 @@ class BrokerServer(
         networkListeners.add(new Listener().
           setHost(if (Utils.isBlank(ep.host)) InetAddress.getLocalHost.getCanonicalHostName else ep.host).
           setName(ep.listenerName.value()).
-          setPort(socketServer.boundPort(ep.listenerName)).
+          setPort(ep.port).

Review comment:
       Yes, sure. I will add a test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10935: KAFKA-13003: In kraft mode also advertise configured advertised port instead of socket port

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10935:
URL: https://github.com/apache/kafka/pull/10935#discussion_r665713457



##########
File path: core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
##########
@@ -313,4 +318,94 @@ class RaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testCreateClusterWithAdvertisedPortZero(): Unit = {
+    val nodes = new TestKitNodes.Builder()
+      .setNumControllerNodes(1)
+      .setNumBrokerNodes(3)
+      .build()
+    nodes.brokerNodes().values().forEach { broker =>
+      broker.propertyOverrides().put(KafkaConfig.ListenersProp,
+        s"${nodes.externalListenerName().value()}://localhost:0")
+      broker.propertyOverrides().put(KafkaConfig.AdvertisedListenersProp,
+        s"${nodes.externalListenerName().value()}://localhost:0")
+    }
+    val cluster = new KafkaClusterTestKit.Builder(nodes).build()
+    try {
+      cluster.format()
+      cluster.startup()
+
+      val (runningBrokerServer, foundRunningBroker) = TestUtils.computeUntilTrue(anyBrokerServer(cluster)){
+        brokerServer => brokerServer.currentState() == BrokerState.RUNNING
+      }
+      assertTrue(foundRunningBroker, "No Broker never made it to RUNNING state.")

Review comment:
       nit: never -> ever?

##########
File path: core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
##########
@@ -313,4 +318,94 @@ class RaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testCreateClusterWithAdvertisedPortZero(): Unit = {
+    val nodes = new TestKitNodes.Builder()
+      .setNumControllerNodes(1)
+      .setNumBrokerNodes(3)
+      .build()
+    nodes.brokerNodes().values().forEach { broker =>
+      broker.propertyOverrides().put(KafkaConfig.ListenersProp,
+        s"${nodes.externalListenerName().value()}://localhost:0")
+      broker.propertyOverrides().put(KafkaConfig.AdvertisedListenersProp,
+        s"${nodes.externalListenerName().value()}://localhost:0")
+    }
+    val cluster = new KafkaClusterTestKit.Builder(nodes).build()
+    try {
+      cluster.format()
+      cluster.startup()
+
+      val (runningBrokerServer, foundRunningBroker) = TestUtils.computeUntilTrue(anyBrokerServer(cluster)){
+        brokerServer => brokerServer.currentState() == BrokerState.RUNNING
+      }
+      assertTrue(foundRunningBroker, "No Broker never made it to RUNNING state.")
+
+      val (describeClusterResponse, metadataUpToDate) = TestUtils.computeUntilTrue(
+        sendDescribeClusterRequest(runningBrokerServer.socketServer, nodes.externalListenerName())
+      ) {
+        response => response.nodes().size() == nodes.brokerNodes().size()
+      }
+      assertTrue(metadataUpToDate, s"Broker never reached expected cluster size of ${nodes.brokerNodes().size()}")

Review comment:
       nit: these tests are basically the same except for the initialization and assertions. It would be nice to factor out a helper or two.

##########
File path: core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
##########
@@ -313,4 +318,94 @@ class RaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testCreateClusterWithAdvertisedPortZero(): Unit = {
+    val nodes = new TestKitNodes.Builder()
+      .setNumControllerNodes(1)
+      .setNumBrokerNodes(3)
+      .build()
+    nodes.brokerNodes().values().forEach { broker =>
+      broker.propertyOverrides().put(KafkaConfig.ListenersProp,
+        s"${nodes.externalListenerName().value()}://localhost:0")
+      broker.propertyOverrides().put(KafkaConfig.AdvertisedListenersProp,
+        s"${nodes.externalListenerName().value()}://localhost:0")
+    }
+    val cluster = new KafkaClusterTestKit.Builder(nodes).build()
+    try {
+      cluster.format()
+      cluster.startup()
+
+      val (runningBrokerServer, foundRunningBroker) = TestUtils.computeUntilTrue(anyBrokerServer(cluster)){
+        brokerServer => brokerServer.currentState() == BrokerState.RUNNING
+      }
+      assertTrue(foundRunningBroker, "No Broker never made it to RUNNING state.")
+
+      val (describeClusterResponse, metadataUpToDate) = TestUtils.computeUntilTrue(
+        sendDescribeClusterRequest(runningBrokerServer.socketServer, nodes.externalListenerName())
+      ) {
+        response => response.nodes().size() == nodes.brokerNodes().size()
+      }
+      assertTrue(metadataUpToDate, s"Broker never reached expected cluster size of ${nodes.brokerNodes().size()}")
+
+      describeClusterResponse.nodes().values().forEach { node =>
+        assertEquals("localhost", node.host,
+          "Did not advertise configured advertised host")
+        assertEquals(cluster.brokers().get(node.id).socketServer.boundPort(nodes.externalListenerName()), node.port,
+          "Did not advertise bound socket port")
+      }
+    } finally {
+      cluster.close()
+    }
+  }
+
+  @Test
+  def testCreateClusterWithAdvertisedHostAndPortDifferentFromSocketServer(): Unit = {
+    val nodes = new TestKitNodes.Builder()
+      .setNumControllerNodes(1)
+      .setNumBrokerNodes(3)
+      .build()
+    nodes.brokerNodes().values().forEach { broker =>
+      broker.propertyOverrides().put(KafkaConfig.ListenersProp,
+        s"${nodes.externalListenerName().value()}://localhost:0")
+      broker.propertyOverrides().put(KafkaConfig.AdvertisedListenersProp,
+        s"${nodes.externalListenerName().value()}://advertised-host-${broker.id}:${broker.id + 100}")
+    }
+    val cluster = new KafkaClusterTestKit.Builder(nodes).build()
+    try {
+      cluster.format()
+      cluster.startup()
+
+      val (runningBrokerServer, foundRunningBroker) = TestUtils.computeUntilTrue(anyBrokerServer(cluster)){
+        brokerServer => brokerServer.currentState() == BrokerState.RUNNING
+      }
+      assertTrue(foundRunningBroker, "No Broker never made it to RUNNING state.")
+
+      val (describeClusterResponse, metadataUpToDate) = TestUtils.computeUntilTrue(
+        sendDescribeClusterRequest(runningBrokerServer.socketServer, nodes.externalListenerName())

Review comment:
       Maybe worth a comment that `connectAndReceive` connects to the bound port instead of the advertised port since that is not obvious.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ueisele commented on pull request #10935: KAFKA-13003: In kraft mode also advertise configured advertised port instead of socket port

Posted by GitBox <gi...@apache.org>.
ueisele commented on pull request #10935:
URL: https://github.com/apache/kafka/pull/10935#issuecomment-877829585


   I checked the tests which are failing. For me it looks like they are not related to the change made in this pull request.
   What is failing is "RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions" for which there is already a ticket (https://issues.apache.org/jira/browse/KAFKA-12629) and "RaftEventSimulationTest". I do not know why the latter one fails, but it does not use the BrokerServer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ueisele commented on a change in pull request #10935: KAFKA-13003: In kraft mode also advertise configured advertised port instead of socket port

Posted by GitBox <gi...@apache.org>.
ueisele commented on a change in pull request #10935:
URL: https://github.com/apache/kafka/pull/10935#discussion_r660063967



##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -283,7 +283,7 @@ class BrokerServer(
         networkListeners.add(new Listener().
           setHost(if (Utils.isBlank(ep.host)) InetAddress.getLocalHost.getCanonicalHostName else ep.host).
           setName(ep.listenerName.value()).
-          setPort(socketServer.boundPort(ep.listenerName)).
+          setPort(ep.port).

Review comment:
       Yes, sure. I will add a test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org