You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "mumrah (via GitHub)" <gi...@apache.org> on 2023/01/31 20:21:05 UTC

[GitHub] [kafka] mumrah commented on a diff in pull request #13169: KAFKA-14658: Do not open broker ports until we are ready to accept traffic

mumrah commented on code in PR #13169:
URL: https://github.com/apache/kafka/pull/13169#discussion_r1092435602


##########
core/src/main/scala/kafka/network/SocketServer.scala:
##########
@@ -585,23 +616,35 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
   private var currentProcessorIndex = 0
   private[network] val throttledSockets = new mutable.PriorityQueue[DelayedCloseSocket]()
   private var started = false
-  private[network] val startFuture = new CompletableFuture[Void]()
+  private[network] val startedFuture = new CompletableFuture[Void]()
 
   val thread = KafkaThread.nonDaemon(
     s"${threadPrefix()}-kafka-socket-acceptor-${endPoint.listenerName}-${endPoint.securityProtocol}-${endPoint.port}",
     this)
 
-  startFuture.thenRun(() => synchronized {
-    if (!shouldRun.get()) {
-      debug(s"Ignoring start future for ${endPoint.listenerName} since the acceptor has already been shut down.")
-    } else {
+  def start(): Unit = synchronized {
+    try {
+      if (!shouldRun.get()) {
+        throw new ClosedChannelException()

Review Comment:
   How does this happen? If the kafka server is shutdown before the socket server is able to finish starting?



##########
core/src/main/scala/kafka/network/SocketServer.scala:
##########
@@ -573,7 +591,20 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
   private val listenBacklogSize = config.socketListenBacklogSize
 
   private val nioSelector = NSelector.open()
-  private[network] val serverChannel = openServerSocket(endPoint.host, endPoint.port, listenBacklogSize)
+
+  // If the port is configured as 0, we are using a random (ephemeral) port, so we need to open
+  // the socket before we can find out what port we have. If it is set to a nonzero value, defer
+  // opening the socket until we start the Acceptor. The reason for deferring the socket opening
+  // is so that systems which assume that the socket being open indicates readiness are not
+  // confused.
+  private[network] var serverChannel: ServerSocketChannel  = _
+  private[network] val localPort: Int  = if (endPoint.port != 0) {

Review Comment:
   It might be good to log when we are opening the socket server early.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org