You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/04 22:43:53 UTC

[1/5] git commit: KAFKA-675 Allow the user to override the host that we bind to. Patch from Matan Amir with slight changes to improve error messages for a bad host or port.

KAFKA-675 Allow the user to override the host that we bind to. Patch from Matan Amir<ma...@voxer.com> with slight changes
to improve error messages for a bad host or port.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c6d41102
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c6d41102
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c6d41102

Branch: refs/heads/trunk
Commit: c6d41102d81ac48b345d3f42d669e6a6bbfbe062
Parents: 85ec044
Author: Jay Kreps <ja...@gmail.com>
Authored: Thu Dec 20 14:13:22 2012 -0800
Committer: Jay Kreps <ja...@gmail.com>
Committed: Thu Dec 20 14:13:22 2012 -0800

----------------------------------------------------------------------
 config/server.properties                           |   11 ++--
 .../main/scala/kafka/network/SocketServer.scala    |   40 +++++++++++---
 core/src/main/scala/kafka/server/KafkaConfig.scala |    6 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |    1 +
 .../main/scala/kafka/server/KafkaZooKeeper.scala   |    8 ++-
 core/src/main/scala/kafka/utils/ZkUtils.scala      |    2 +-
 .../unit/kafka/network/SocketServerTest.scala      |    1 +
 7 files changed, 48 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c6d41102/config/server.properties
----------------------------------------------------------------------
diff --git a/config/server.properties b/config/server.properties
index e92f599..f4521fb 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -19,17 +19,16 @@
 # The id of the broker. This must be set to a unique integer for each broker.
 brokerid=0
 
-# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
-# from InetAddress.getLocalHost().  If there are multiple interfaces getLocalHost
-# may not be what you want.
-#hostname=
-
-
 ############################# Socket Server Settings #############################
 
 # The port the socket server listens on
 port=9092
 
+# Hostname the broker will bind to and advertise to producers and consumers.
+# If not set, the server will bind to all interfaces and advertise the value returned from
+# from java.net.InetAddress.getCanonicalHostName().
+#hostname=localhost
+
 # The number of threads handling network requests
 network.threads=2
  

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6d41102/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 9cdadd7..2102fbf 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -23,6 +23,7 @@ import java.net._
 import java.io._
 import java.nio.channels._
 
+import kafka.common.KafkaException
 import kafka.utils._
 
 /**
@@ -32,6 +33,7 @@ import kafka.utils._
  *   M Handler threads that handle requests and produce responses back to the processor threads for writing.
  */
 class SocketServer(val brokerId: Int,
+                   val host: String,
                    val port: Int,
                    val numProcessorThreads: Int, 
                    val maxQueuedRequests: Int,
@@ -39,7 +41,7 @@ class SocketServer(val brokerId: Int,
   this.logIdent = "[Socket Server on Broker " + brokerId + "], "
   private val time = SystemTime
   private val processors = new Array[Processor](numProcessorThreads)
-  private var acceptor: Acceptor = new Acceptor(port, processors)
+  @volatile private var acceptor: Acceptor = null
   val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests)
 
   /**
@@ -54,6 +56,7 @@ class SocketServer(val brokerId: Int,
     requestChannel.addResponseListener((id:Int) => processors(id).wakeup())
    
     // start accepting connections
+    this.acceptor = new Acceptor(host, port, processors)
     Utils.newThread("kafka-acceptor", acceptor, false).start()
     acceptor.awaitStartup
     info("started")
@@ -64,10 +67,11 @@ class SocketServer(val brokerId: Int,
    */
   def shutdown() = {
     info("shutting down")
-    acceptor.shutdown
+    if(acceptor != null)
+      acceptor.shutdown()
     for(processor <- processors)
-      processor.shutdown
-    info("shutted down completely")
+      processor.shutdown()
+    info("shut down completely")
   }
 }
 
@@ -123,17 +127,14 @@ private[kafka] abstract class AbstractServerThread extends Runnable with Logging
 /**
  * Thread that accepts and configures new connections. There is only need for one of these
  */
-private[kafka] class Acceptor(val port: Int, private val processors: Array[Processor]) extends AbstractServerThread {
+private[kafka] class Acceptor(val host: String, val port: Int, private val processors: Array[Processor]) extends AbstractServerThread {
+  val serverChannel = openServerSocket(host, port)
 
   /**
    * Accept loop that checks for new connection attempts
    */
   def run() {
-    val serverChannel = ServerSocketChannel.open()
-    serverChannel.configureBlocking(false)
-    serverChannel.socket.bind(new InetSocketAddress(port))
     serverChannel.register(selector, SelectionKey.OP_ACCEPT);
-    info("Awaiting connections on port " + port)
     startupComplete()
     var currentProcessor = 0
     while(isRunning) {
@@ -164,6 +165,27 @@ private[kafka] class Acceptor(val port: Int, private val processors: Array[Proce
     swallowError(selector.close())
     shutdownComplete()
   }
+  
+  /*
+   * Create a server socket to listen for connections on.
+   */
+  def openServerSocket(host: String, port: Int): ServerSocketChannel = {
+    val socketAddress = 
+      if(host == null || host.trim.isEmpty)
+        new InetSocketAddress(port)
+      else
+        new InetSocketAddress(host, port)
+    val serverChannel = ServerSocketChannel.open()
+    serverChannel.configureBlocking(false)
+    try {
+      serverChannel.socket.bind(socketAddress)
+      info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostName, port))
+    } catch {
+      case e: SocketException => 
+        throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostName, port, e.getMessage), e)
+    }
+    serverChannel
+  }
 
   /*
    * Accept a new connection

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6d41102/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 5754676..962b65f 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -20,7 +20,6 @@ package kafka.server
 import java.util.Properties
 import kafka.message.Message
 import kafka.consumer.ConsumerConfig
-import java.net.InetAddress
 import kafka.utils.{VerifiableProperties, ZKConfig, Utils}
 
 /**
@@ -56,8 +55,9 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   /* the port to listen and accept connections on */
   val port: Int = props.getInt("port", 6667)
 
-  /* hostname of broker. If not set, will pick up from the value returned from getLocalHost. If there are multiple interfaces getLocalHost may not be what you want. */
-  val hostName: String = props.getString("hostname", InetAddress.getLocalHost.getHostAddress)
+  /* hostname of broker. If this is set, it will only bind to this address.  If this is not set,
+   * it will bind to all interfaces, and publish one to ZK */
+  val hostName: String = props.getString("hostname", null)
 
   /* the SO_SNDBUFF buffer of the socket sever sockets */
   val socketSendBuffer: Int = props.getInt("socket.send.buffer", 100*1024)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6d41102/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index d444d22..ae35e4f 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -62,6 +62,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
     logManager.startup()
 
     socketServer = new SocketServer(config.brokerId,
+                                    config.hostName,
                                     config.port,
                                     config.numNetworkThreads,
                                     config.numQueuedRequests,

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6d41102/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
index e1c11f2..42f8239 100644
--- a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
+++ b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
@@ -21,6 +21,7 @@ import kafka.utils._
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
 import kafka.common._
+import java.net.InetAddress
 
 
 /**
@@ -41,8 +42,11 @@ class KafkaZooKeeper(config: KafkaConfig) extends Logging {
    }
 
   private def registerBrokerInZk() {
-    info("Registering broker " + brokerIdPath)
-    val hostName = config.hostName
+    val hostName = 
+      if(config.hostName == null || config.hostName.trim.isEmpty) 
+        InetAddress.getLocalHost.getCanonicalHostName 
+      else
+        config.hostName 
     ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, config.port)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6d41102/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 358c4fd..eabedd0 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -189,7 +189,7 @@ object ZkUtils extends Logging {
       case e: ZkNodeExistsException =>
         throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or " + "else you have shutdown this broker and restarted it faster than the zookeeper " + "timeout so it appears to be re-registering.")
     }
-    info("Registering broker " + brokerIdPath + " succeeded with " + broker)
+    info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port))
   }
 
   def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6d41102/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 3b5ec7f..7395cbc 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -34,6 +34,7 @@ import kafka.message.ByteBufferMessageSet
 class SocketServerTest extends JUnitSuite {
 
   val server: SocketServer = new SocketServer(0,
+                                              host = null,
                                               port = TestUtils.choosePort,
                                               numProcessorThreads = 1,
                                               maxQueuedRequests = 50,