You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2012/12/21 04:37:26 UTC
git commit: KAFKA-675 Allow the user to override the host that we
bind to. Patch from Matan Amir with slight changes to
improve error messages for a bad host or port.
Updated Branches:
refs/heads/KAFKA-675 [created] c6d41102d
KAFKA-675 Allow the user to override the host that we bind to. Patch from Matan Amir<ma...@voxer.com> with slight changes
to improve error messages for a bad host or port.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c6d41102
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c6d41102
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c6d41102
Branch: refs/heads/KAFKA-675
Commit: c6d41102d81ac48b345d3f42d669e6a6bbfbe062
Parents: 85ec044
Author: Jay Kreps <ja...@gmail.com>
Authored: Thu Dec 20 14:13:22 2012 -0800
Committer: Jay Kreps <ja...@gmail.com>
Committed: Thu Dec 20 14:13:22 2012 -0800
----------------------------------------------------------------------
config/server.properties | 11 ++--
.../main/scala/kafka/network/SocketServer.scala | 40 +++++++++++---
core/src/main/scala/kafka/server/KafkaConfig.scala | 6 +-
core/src/main/scala/kafka/server/KafkaServer.scala | 1 +
.../main/scala/kafka/server/KafkaZooKeeper.scala | 8 ++-
core/src/main/scala/kafka/utils/ZkUtils.scala | 2 +-
.../unit/kafka/network/SocketServerTest.scala | 1 +
7 files changed, 48 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c6d41102/config/server.properties
----------------------------------------------------------------------
diff --git a/config/server.properties b/config/server.properties
index e92f599..f4521fb 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -19,17 +19,16 @@
# The id of the broker. This must be set to a unique integer for each broker.
brokerid=0
-# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
-# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost
-# may not be what you want.
-#hostname=
-
-
############################# Socket Server Settings #############################
# The port the socket server listens on
port=9092
+# Hostname the broker will bind to and advertise to producers and consumers.
+# If not set, the server will bind to all interfaces and advertise the value returned from
+# from java.net.InetAddress.getCanonicalHostName().
+#hostname=localhost
+
# The number of threads handling network requests
network.threads=2
http://git-wip-us.apache.org/repos/asf/kafka/blob/c6d41102/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 9cdadd7..2102fbf 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -23,6 +23,7 @@ import java.net._
import java.io._
import java.nio.channels._
+import kafka.common.KafkaException
import kafka.utils._
/**
@@ -32,6 +33,7 @@ import kafka.utils._
* M Handler threads that handle requests and produce responses back to the processor threads for writing.
*/
class SocketServer(val brokerId: Int,
+ val host: String,
val port: Int,
val numProcessorThreads: Int,
val maxQueuedRequests: Int,
@@ -39,7 +41,7 @@ class SocketServer(val brokerId: Int,
this.logIdent = "[Socket Server on Broker " + brokerId + "], "
private val time = SystemTime
private val processors = new Array[Processor](numProcessorThreads)
- private var acceptor: Acceptor = new Acceptor(port, processors)
+ @volatile private var acceptor: Acceptor = null
val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests)
/**
@@ -54,6 +56,7 @@ class SocketServer(val brokerId: Int,
requestChannel.addResponseListener((id:Int) => processors(id).wakeup())
// start accepting connections
+ this.acceptor = new Acceptor(host, port, processors)
Utils.newThread("kafka-acceptor", acceptor, false).start()
acceptor.awaitStartup
info("started")
@@ -64,10 +67,11 @@ class SocketServer(val brokerId: Int,
*/
def shutdown() = {
info("shutting down")
- acceptor.shutdown
+ if(acceptor != null)
+ acceptor.shutdown()
for(processor <- processors)
- processor.shutdown
- info("shutted down completely")
+ processor.shutdown()
+ info("shut down completely")
}
}
@@ -123,17 +127,14 @@ private[kafka] abstract class AbstractServerThread extends Runnable with Logging
/**
* Thread that accepts and configures new connections. There is only need for one of these
*/
-private[kafka] class Acceptor(val port: Int, private val processors: Array[Processor]) extends AbstractServerThread {
+private[kafka] class Acceptor(val host: String, val port: Int, private val processors: Array[Processor]) extends AbstractServerThread {
+ val serverChannel = openServerSocket(host, port)
/**
* Accept loop that checks for new connection attempts
*/
def run() {
- val serverChannel = ServerSocketChannel.open()
- serverChannel.configureBlocking(false)
- serverChannel.socket.bind(new InetSocketAddress(port))
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
- info("Awaiting connections on port " + port)
startupComplete()
var currentProcessor = 0
while(isRunning) {
@@ -164,6 +165,27 @@ private[kafka] class Acceptor(val port: Int, private val processors: Array[Proce
swallowError(selector.close())
shutdownComplete()
}
+
+ /*
+ * Create a server socket to listen for connections on.
+ */
+ def openServerSocket(host: String, port: Int): ServerSocketChannel = {
+ val socketAddress =
+ if(host == null || host.trim.isEmpty)
+ new InetSocketAddress(port)
+ else
+ new InetSocketAddress(host, port)
+ val serverChannel = ServerSocketChannel.open()
+ serverChannel.configureBlocking(false)
+ try {
+ serverChannel.socket.bind(socketAddress)
+ info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostName, port))
+ } catch {
+ case e: SocketException =>
+ throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostName, port, e.getMessage), e)
+ }
+ serverChannel
+ }
/*
* Accept a new connection
http://git-wip-us.apache.org/repos/asf/kafka/blob/c6d41102/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 5754676..962b65f 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -20,7 +20,6 @@ package kafka.server
import java.util.Properties
import kafka.message.Message
import kafka.consumer.ConsumerConfig
-import java.net.InetAddress
import kafka.utils.{VerifiableProperties, ZKConfig, Utils}
/**
@@ -56,8 +55,9 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/* the port to listen and accept connections on */
val port: Int = props.getInt("port", 6667)
- /* hostname of broker. If not set, will pick up from the value returned from getLocalHost. If there are multiple interfaces getLocalHost may not be what you want. */
- val hostName: String = props.getString("hostname", InetAddress.getLocalHost.getHostAddress)
+ /* hostname of broker. If this is set, it will only bind to this address. If this is not set,
+ * it will bind to all interfaces, and publish one to ZK */
+ val hostName: String = props.getString("hostname", null)
/* the SO_SNDBUFF buffer of the socket sever sockets */
val socketSendBuffer: Int = props.getInt("socket.send.buffer", 100*1024)
http://git-wip-us.apache.org/repos/asf/kafka/blob/c6d41102/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index d444d22..ae35e4f 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -62,6 +62,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
logManager.startup()
socketServer = new SocketServer(config.brokerId,
+ config.hostName,
config.port,
config.numNetworkThreads,
config.numQueuedRequests,
http://git-wip-us.apache.org/repos/asf/kafka/blob/c6d41102/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
index e1c11f2..42f8239 100644
--- a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
+++ b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
@@ -21,6 +21,7 @@ import kafka.utils._
import org.apache.zookeeper.Watcher.Event.KeeperState
import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
import kafka.common._
+import java.net.InetAddress
/**
@@ -41,8 +42,11 @@ class KafkaZooKeeper(config: KafkaConfig) extends Logging {
}
private def registerBrokerInZk() {
- info("Registering broker " + brokerIdPath)
- val hostName = config.hostName
+ val hostName =
+ if(config.hostName == null || config.hostName.trim.isEmpty)
+ InetAddress.getLocalHost.getCanonicalHostName
+ else
+ config.hostName
ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, config.port)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c6d41102/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 358c4fd..eabedd0 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -189,7 +189,7 @@ object ZkUtils extends Logging {
case e: ZkNodeExistsException =>
throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or " + "else you have shutdown this broker and restarted it faster than the zookeeper " + "timeout so it appears to be re-registering.")
}
- info("Registering broker " + brokerIdPath + " succeeded with " + broker)
+ info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port))
}
def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c6d41102/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 3b5ec7f..7395cbc 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -34,6 +34,7 @@ import kafka.message.ByteBufferMessageSet
class SocketServerTest extends JUnitSuite {
val server: SocketServer = new SocketServer(0,
+ host = null,
port = TestUtils.choosePort,
numProcessorThreads = 1,
maxQueuedRequests = 50,