You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/11/04 06:17:29 UTC

git commit: kafka-1733; Producer.send will block indeterminately when broker is unavailable; patched by Marc Chung; reviewed by Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk 1ed9cf6d0 -> 58e3f99e2


kafka-1733; Producer.send will block indeterminately when broker is unavailable; patched by Marc Chung; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/58e3f99e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/58e3f99e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/58e3f99e

Branch: refs/heads/trunk
Commit: 58e3f99e244d6c4b9ad4166cc5fcf75561d87dab
Parents: 1ed9cf6
Author: Marc Chung <mchungWgmail.com>
Authored: Mon Nov 3 21:17:21 2014 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Nov 3 21:17:21 2014 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/network/BlockingChannel.scala | 11 +++++++----
 core/src/main/scala/kafka/producer/SyncProducer.scala   |  2 +-
 2 files changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/58e3f99e/core/src/main/scala/kafka/network/BlockingChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala
index eb7bb14..6e2a38e 100644
--- a/core/src/main/scala/kafka/network/BlockingChannel.scala
+++ b/core/src/main/scala/kafka/network/BlockingChannel.scala
@@ -42,7 +42,8 @@ class BlockingChannel( val host: String,
   private var readChannel: ReadableByteChannel = null
   private var writeChannel: GatheringByteChannel = null
   private val lock = new Object()
-  
+  private val connectTimeoutMs = readTimeoutMs
+
   def connect() = lock synchronized  {
     if(!connected) {
       try {
@@ -55,19 +56,21 @@ class BlockingChannel( val host: String,
         channel.socket.setSoTimeout(readTimeoutMs)
         channel.socket.setKeepAlive(true)
         channel.socket.setTcpNoDelay(true)
-        channel.connect(new InetSocketAddress(host, port))
+        channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs)
 
         writeChannel = channel
         readChannel = Channels.newChannel(channel.socket().getInputStream)
         connected = true
         // settings may not match what we requested above
-        val msg = "Created socket with SO_TIMEOUT = %d (requested %d), SO_RCVBUF = %d (requested %d), SO_SNDBUF = %d (requested %d)."
+        val msg = "Created socket with SO_TIMEOUT = %d (requested %d), SO_RCVBUF = %d (requested %d), SO_SNDBUF = %d (requested %d), connectTimeoutMs = %d."
         debug(msg.format(channel.socket.getSoTimeout,
                          readTimeoutMs,
                          channel.socket.getReceiveBufferSize, 
                          readBufferSize,
                          channel.socket.getSendBufferSize,
-                         writeBufferSize))
+                         writeBufferSize,
+                         connectTimeoutMs))
+
       } catch {
         case e: Throwable => disconnect()
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/58e3f99e/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 42c9503..35e9e8c 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -42,7 +42,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
   val brokerInfo = "host_%s-port_%s".format(config.host, config.port)
   val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats(config.clientId)
 
-  trace("Instantiating Scala Sync Producer")
+  trace("Instantiating Scala Sync Producer with properties: %s".format(config.props))
 
   private def verifyRequest(request: RequestOrResponse) = {
     /**