You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/14 08:09:05 UTC

[GitHub] [kafka] ijuma opened a new pull request #10872: KAFKA-12945: Remove port, host.name and related configs in 3.0

ijuma opened a new pull request #10872:
URL: https://github.com/apache/kafka/pull/10872


   They have been deprecated since 0.10.0. Full list of removes configs:
   * port
   * host.name
   * advertised.port
   * advertised.host.name
   
   Also adjust tests to take the removals into account.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #10872: KAFKA-12945: Remove port, host.name and related configs in 3.0

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #10872:
URL: https://github.com/apache/kafka/pull/10872#issuecomment-860227370






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10872: KAFKA-12945: Remove port, host.name and related configs in 3.0

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10872:
URL: https://github.com/apache/kafka/pull/10872#discussion_r652327781



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
##########
@@ -123,24 +123,23 @@ public void start() {
     }
 
     private void start(int[] brokerPorts, String[] logDirs) {
-        brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString());
+        brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString());
 
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.HostNameProp(), "localhost");
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.GroupInitialRebalanceDelayMsProp(), 0);
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), (short) brokers.length);
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), false);
+        putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true);
+        putIfAbsent(brokerConfig, KafkaConfig.GroupInitialRebalanceDelayMsProp(), 0);
+        putIfAbsent(brokerConfig, KafkaConfig.OffsetsTopicReplicationFactorProp(), (short) brokers.length);
+        putIfAbsent(brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(), false);
 
-        Object listenerConfig = brokerConfig.get(KafkaConfig$.MODULE$.InterBrokerListenerNameProp());
+        Object listenerConfig = brokerConfig.get(KafkaConfig.InterBrokerListenerNameProp());
         if (listenerConfig != null) {
             listenerName = new ListenerName(listenerConfig.toString());
         }
 
         for (int i = 0; i < brokers.length; i++) {
-            brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), i);
+            brokerConfig.put(KafkaConfig.BrokerIdProp(), i);
             currentBrokerLogDirs[i] = logDirs[i] == null ? createLogDir() : currentBrokerLogDirs[i];
-            brokerConfig.put(KafkaConfig$.MODULE$.LogDirProp(), currentBrokerLogDirs[i]);
-            brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), brokerPorts[i]);
+            brokerConfig.put(KafkaConfig.LogDirProp(), currentBrokerLogDirs[i]);
+            putIfAbsent(brokerConfig, KafkaConfig.ListenersProp(), "PLAINTEXT://localhost:" + brokerPorts[i]);

Review comment:
       Thanks for investigating this. The line links don't work for me. Which files are the lines referring to?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #10872: KAFKA-12945: Remove port, host.name and related configs in 3.0

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #10872:
URL: https://github.com/apache/kafka/pull/10872#issuecomment-860169526


   Thanks @ijuma. I have started to work on this a while ago but I haven't had the time to finish it yet: https://github.com/apache/kafka/pull/10437. I will close it and review yours.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10872: KAFKA-12945: Remove port, host.name and related configs in 3.0

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10872:
URL: https://github.com/apache/kafka/pull/10872#discussion_r651803100



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
##########
@@ -88,20 +88,20 @@ public void start() throws IOException {
         zookeeper = new EmbeddedZookeeper();
         log.debug("ZooKeeper instance is running at {}", zKConnectString());
 
-        brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString());
-        brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), DEFAULT_BROKER_PORT);
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L);
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), 0);
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.GroupInitialRebalanceDelayMsProp(), 0);
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), (short) 1);
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.OffsetsTopicPartitionsProp(), 5);
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.TransactionsTopicPartitionsProp(), 5);
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true);
+        brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString());
+        brokerConfig.put(KafkaConfig.ListenersProp(), "PLAINTEXT://localhost:" + DEFAULT_BROKER_PORT);

Review comment:
       Yes, agreed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10872: KAFKA-12945: Remove port, host.name and related configs in 3.0

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10872:
URL: https://github.com/apache/kafka/pull/10872#discussion_r652329369



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
##########
@@ -123,24 +123,23 @@ public void start() {
     }
 
     private void start(int[] brokerPorts, String[] logDirs) {
-        brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString());
+        brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString());
 
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.HostNameProp(), "localhost");
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.GroupInitialRebalanceDelayMsProp(), 0);
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), (short) brokers.length);
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), false);
+        putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true);
+        putIfAbsent(brokerConfig, KafkaConfig.GroupInitialRebalanceDelayMsProp(), 0);
+        putIfAbsent(brokerConfig, KafkaConfig.OffsetsTopicReplicationFactorProp(), (short) brokers.length);
+        putIfAbsent(brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(), false);
 
-        Object listenerConfig = brokerConfig.get(KafkaConfig$.MODULE$.InterBrokerListenerNameProp());
+        Object listenerConfig = brokerConfig.get(KafkaConfig.InterBrokerListenerNameProp());
         if (listenerConfig != null) {
             listenerName = new ListenerName(listenerConfig.toString());
         }
 
         for (int i = 0; i < brokers.length; i++) {
-            brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), i);
+            brokerConfig.put(KafkaConfig.BrokerIdProp(), i);
             currentBrokerLogDirs[i] = logDirs[i] == null ? createLogDir() : currentBrokerLogDirs[i];
-            brokerConfig.put(KafkaConfig$.MODULE$.LogDirProp(), currentBrokerLogDirs[i]);
-            brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), brokerPorts[i]);
+            brokerConfig.put(KafkaConfig.LogDirProp(), currentBrokerLogDirs[i]);
+            putIfAbsent(brokerConfig, KafkaConfig.ListenersProp(), "PLAINTEXT://localhost:" + brokerPorts[i]);

Review comment:
       This method is what I'm referring to:
   ```
   /**
        * Starts the Kafka cluster alone using the ports that were assigned during initialization of
        * the harness.
        *
        * @throws ConnectException if a directory to store the data cannot be created
        */
       public void startOnlyKafkaOnSamePorts() {
           start(currentBrokerPorts, currentBrokerLogDirs);
       }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10872: KAFKA-12945: Remove port, host.name and related configs in 3.0

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10872:
URL: https://github.com/apache/kafka/pull/10872#discussion_r650636358



##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -480,7 +480,8 @@ class KafkaServer(
         endpoint
     )
 
-    val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
+    val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "
+    ").toInt

Review comment:
       Yeah, this was an unintentional change in the last commit. I'll fix it along with the test issues.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #10872: KAFKA-12945: Remove port, host.name and related configs in 3.0

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #10872:
URL: https://github.com/apache/kafka/pull/10872#issuecomment-861653253


   @ijuma Overall, the PR LGTM. There are few connect related failed tests in the last Jenkins run. This is suspicious... I haven't had the time to really investigate them though.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #10872: KAFKA-12945: Remove port, host.name and related configs in 3.0

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #10872:
URL: https://github.com/apache/kafka/pull/10872#issuecomment-861667531


   @dajac Most of them pass locally, I am re-running and will investigate more the ones that are not flaky.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10872: KAFKA-12945: Remove port, host.name and related configs in 3.0

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10872:
URL: https://github.com/apache/kafka/pull/10872#discussion_r650627746



##########
File path: docs/upgrade.html
##########
@@ -25,6 +25,8 @@ <h5><a id="upgrade_300_notable" href="#upgrade_300_notable">Notable changes in 3
     <li>A number of implementation dependency jars are <a href="https://github.com/apache/kafka/pull/10203">now available in the runtime classpath
         instead of compile and runtime classpaths</a>. Compilation errors after the upgrade can be fixed by adding the missing dependency jar(s) explicitly
         or updating the application not to use internal classes.</li>
+    <li>The default value for the consumer configuration <code>session.timeout.ms</code> was increased from 10s to 45s. See
+        <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-735%3A+Increase+default+consumer+session+timeout">KIP-735</a> for more details.</li>

Review comment:
       Aha, good catch to add 45s default session timeout!

##########
File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
##########
@@ -145,69 +145,35 @@ class KafkaConfigTest {
 
   @Test
   def testAdvertiseDefaults(): Unit = {
-    val port = "9999"
+    val port = 9999
     val hostName = "fake-host"
-
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
-    props.remove(KafkaConfig.ListenersProp)
-    props.put(KafkaConfig.HostNameProp, hostName)
-    props.put(KafkaConfig.PortProp, port)
+    val props = new Properties()
+    props.put(KafkaConfig.BrokerIdProp, "1")
+    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.put(KafkaConfig.ListenersProp, s"PLAINTEXT://$hostName:$port")

Review comment:
       Nice tests update!

##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -480,7 +480,8 @@ class KafkaServer(
         endpoint
     )
 
-    val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
+    val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "
+    ").toInt

Review comment:
       This should be typo. It caused **compile error**!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #10872: KAFKA-12945: Remove port, host.name and related configs in 3.0

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #10872:
URL: https://github.com/apache/kafka/pull/10872#issuecomment-860790267


   > @dajac I updated this PR to hopefully be a combination of our changes. I'll assign the JIRA to you since your PR was first and, if you're happy with it, we can merge this with both of us as co-authors. Once again, my bad for duplicating work here.
   
   Yeah, that works for me. No worries ;)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10872: KAFKA-12945: Remove port, host.name and related configs in 3.0

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10872:
URL: https://github.com/apache/kafka/pull/10872#discussion_r651082142



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
##########
@@ -88,20 +88,20 @@ public void start() throws IOException {
         zookeeper = new EmbeddedZookeeper();
         log.debug("ZooKeeper instance is running at {}", zKConnectString());
 
-        brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString());
-        brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), DEFAULT_BROKER_PORT);
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L);
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), 0);
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.GroupInitialRebalanceDelayMsProp(), 0);
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), (short) 1);
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.OffsetsTopicPartitionsProp(), 5);
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.TransactionsTopicPartitionsProp(), 5);
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true);
+        brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString());
+        brokerConfig.put(KafkaConfig.ListenersProp(), "PLAINTEXT://localhost:" + DEFAULT_BROKER_PORT);

Review comment:
       Should we only set it if not provided? That could explain why `ResetIntegrationWithSslTest.classMethod` failed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10872: KAFKA-12945: Remove port, host.name and related configs in 3.0

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10872:
URL: https://github.com/apache/kafka/pull/10872#discussion_r652329162



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
##########
@@ -123,24 +123,23 @@ public void start() {
     }
 
     private void start(int[] brokerPorts, String[] logDirs) {
-        brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString());
+        brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString());
 
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.HostNameProp(), "localhost");
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.GroupInitialRebalanceDelayMsProp(), 0);
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), (short) brokers.length);
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), false);
+        putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true);
+        putIfAbsent(brokerConfig, KafkaConfig.GroupInitialRebalanceDelayMsProp(), 0);
+        putIfAbsent(brokerConfig, KafkaConfig.OffsetsTopicReplicationFactorProp(), (short) brokers.length);
+        putIfAbsent(brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(), false);
 
-        Object listenerConfig = brokerConfig.get(KafkaConfig$.MODULE$.InterBrokerListenerNameProp());
+        Object listenerConfig = brokerConfig.get(KafkaConfig.InterBrokerListenerNameProp());
         if (listenerConfig != null) {
             listenerName = new ListenerName(listenerConfig.toString());
         }
 
         for (int i = 0; i < brokers.length; i++) {
-            brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), i);
+            brokerConfig.put(KafkaConfig.BrokerIdProp(), i);
             currentBrokerLogDirs[i] = logDirs[i] == null ? createLogDir() : currentBrokerLogDirs[i];
-            brokerConfig.put(KafkaConfig$.MODULE$.LogDirProp(), currentBrokerLogDirs[i]);
-            brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), brokerPorts[i]);
+            brokerConfig.put(KafkaConfig.LogDirProp(), currentBrokerLogDirs[i]);
+            putIfAbsent(brokerConfig, KafkaConfig.ListenersProp(), "PLAINTEXT://localhost:" + brokerPorts[i]);

Review comment:
       `connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java ` it is. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10872: KAFKA-12945: Remove port, host.name and related configs in 3.0

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10872:
URL: https://github.com/apache/kafka/pull/10872#discussion_r652324325



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
##########
@@ -123,24 +123,23 @@ public void start() {
     }
 
     private void start(int[] brokerPorts, String[] logDirs) {
-        brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString());
+        brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString());
 
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.HostNameProp(), "localhost");
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.GroupInitialRebalanceDelayMsProp(), 0);
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), (short) brokers.length);
-        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), false);
+        putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true);
+        putIfAbsent(brokerConfig, KafkaConfig.GroupInitialRebalanceDelayMsProp(), 0);
+        putIfAbsent(brokerConfig, KafkaConfig.OffsetsTopicReplicationFactorProp(), (short) brokers.length);
+        putIfAbsent(brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(), false);
 
-        Object listenerConfig = brokerConfig.get(KafkaConfig$.MODULE$.InterBrokerListenerNameProp());
+        Object listenerConfig = brokerConfig.get(KafkaConfig.InterBrokerListenerNameProp());
         if (listenerConfig != null) {
             listenerName = new ListenerName(listenerConfig.toString());
         }
 
         for (int i = 0; i < brokers.length; i++) {
-            brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), i);
+            brokerConfig.put(KafkaConfig.BrokerIdProp(), i);
             currentBrokerLogDirs[i] = logDirs[i] == null ? createLogDir() : currentBrokerLogDirs[i];
-            brokerConfig.put(KafkaConfig$.MODULE$.LogDirProp(), currentBrokerLogDirs[i]);
-            brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), brokerPorts[i]);
+            brokerConfig.put(KafkaConfig.LogDirProp(), currentBrokerLogDirs[i]);
+            putIfAbsent(brokerConfig, KafkaConfig.ListenersProp(), "PLAINTEXT://localhost:" + brokerPorts[i]);

Review comment:
       @ijuma , I found why the test `testBrokerCoordinator` failed. It's because we use `putIfAbsent` here, but in the `testBrokerCoordinator`, we want to bind previous allocated port [line 114](https://github.com/apache/kafka/pull/10872/files#diff-234389ba52863064119a9fbb8d1649d6a039a28790b7c186357e60570b0af049L114). If we use `putIfAbsent` here, it'll use default port `0` to have a random assigned port as [line 120](https://github.com/apache/kafka/pull/10872/files#diff-234389ba52863064119a9fbb8d1649d6a039a28790b7c186357e60570b0af049R120). So, we should directly `put` property here to fix it. FYI




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org