You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/11/04 06:17:29 UTC
git commit: kafka-1733;
Producer.send will block indeterminately when broker is unavailable;
patched by Marc Chung; reviewed by Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk 1ed9cf6d0 -> 58e3f99e2
kafka-1733; Producer.send will block indeterminately when broker is unavailable; patched by Marc Chung; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/58e3f99e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/58e3f99e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/58e3f99e
Branch: refs/heads/trunk
Commit: 58e3f99e244d6c4b9ad4166cc5fcf75561d87dab
Parents: 1ed9cf6
Author: Marc Chung <mchungWgmail.com>
Authored: Mon Nov 3 21:17:21 2014 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Nov 3 21:17:21 2014 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/network/BlockingChannel.scala | 11 +++++++----
core/src/main/scala/kafka/producer/SyncProducer.scala | 2 +-
2 files changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/58e3f99e/core/src/main/scala/kafka/network/BlockingChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala
index eb7bb14..6e2a38e 100644
--- a/core/src/main/scala/kafka/network/BlockingChannel.scala
+++ b/core/src/main/scala/kafka/network/BlockingChannel.scala
@@ -42,7 +42,8 @@ class BlockingChannel( val host: String,
private var readChannel: ReadableByteChannel = null
private var writeChannel: GatheringByteChannel = null
private val lock = new Object()
-
+ private val connectTimeoutMs = readTimeoutMs
+
def connect() = lock synchronized {
if(!connected) {
try {
@@ -55,19 +56,21 @@ class BlockingChannel( val host: String,
channel.socket.setSoTimeout(readTimeoutMs)
channel.socket.setKeepAlive(true)
channel.socket.setTcpNoDelay(true)
- channel.connect(new InetSocketAddress(host, port))
+ channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs)
writeChannel = channel
readChannel = Channels.newChannel(channel.socket().getInputStream)
connected = true
// settings may not match what we requested above
- val msg = "Created socket with SO_TIMEOUT = %d (requested %d), SO_RCVBUF = %d (requested %d), SO_SNDBUF = %d (requested %d)."
+ val msg = "Created socket with SO_TIMEOUT = %d (requested %d), SO_RCVBUF = %d (requested %d), SO_SNDBUF = %d (requested %d), connectTimeoutMs = %d."
debug(msg.format(channel.socket.getSoTimeout,
readTimeoutMs,
channel.socket.getReceiveBufferSize,
readBufferSize,
channel.socket.getSendBufferSize,
- writeBufferSize))
+ writeBufferSize,
+ connectTimeoutMs))
+
} catch {
case e: Throwable => disconnect()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/58e3f99e/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 42c9503..35e9e8c 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -42,7 +42,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
val brokerInfo = "host_%s-port_%s".format(config.host, config.port)
val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats(config.clientId)
- trace("Instantiating Scala Sync Producer")
+ trace("Instantiating Scala Sync Producer with properties: %s".format(config.props))
private def verifyRequest(request: RequestOrResponse) = {
/**