You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/08/25 23:27:19 UTC

kafka git commit: HOTFIX: Fix verbose logging in ControllerChannelManager.brokerReady

Repository: kafka
Updated Branches:
  refs/heads/trunk 4cdb96f4c -> 53651937f


HOTFIX: Fix verbose logging in ControllerChannelManager.brokerReady

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #1786 from hachikuji/hotfix-ctrlchannelmgr-verbose-logging


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/53651937
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/53651937
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/53651937

Branch: refs/heads/trunk
Commit: 53651937fa8a7750daf44d7e88e616f01e2e37d3
Parents: 4cdb96f
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Aug 26 00:05:22 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Aug 26 00:05:22 2016 +0100

----------------------------------------------------------------------
 .../controller/ControllerChannelManager.scala     |  9 +++++----
 .../kafka/utils/NetworkClientBlockingOps.scala    | 18 ++++++++++++++----
 2 files changed, 19 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/53651937/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index c46a536..03cd98c 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -226,12 +226,13 @@ class RequestSendThread(val controllerId: Int,
   private def brokerReady(): Boolean = {
     import NetworkClientBlockingOps._
     try {
-      val ready = networkClient.blockingReady(brokerNode, socketTimeoutMs)(time)
+      if (!networkClient.isReady(brokerNode)(time)) {
+        if (!networkClient.blockingReady(brokerNode, socketTimeoutMs)(time))
+          throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms")
 
-      if (!ready)
-        throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms")
+        info("Controller %d connected to %s for sending state change requests".format(controllerId, brokerNode.toString()))
+      }
 
-      info("Controller %d connected to %s for sending state change requests".format(controllerId, brokerNode.toString()))
       true
     } catch {
       case e: Throwable =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/53651937/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
index 9aca663..9b0828f 100644
--- a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
+++ b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
@@ -45,6 +45,19 @@ object NetworkClientBlockingOps {
 class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal {
 
   /**
+    * Checks whether the node is currently connected, first calling `client.poll` to ensure that any pending
+    * disconnects have been processed.
+    *
+    * This method can be used to check the status of a connection prior to calling `blockingReady` to be able
+    * to tell whether the latter completed a new connection.
+    */
+  def isReady(node: Node)(implicit time: JTime): Boolean = {
+    val currentTime = time.milliseconds()
+    client.poll(0, currentTime)
+    client.isReady(node, currentTime)
+  }
+
+  /**
    * Invokes `client.poll` to discard pending disconnects, followed by `client.ready` and 0 or more `client.poll`
    * invocations until the connection to `node` is ready, the timeout expires or the connection fails.
    *
@@ -77,10 +90,7 @@ class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal {
       }
     }
 
-    // poll once to receive pending disconnects
-    client.poll(0, startTime)
-
-    client.ready(node, startTime) || awaitReady(startTime)
+    isReady(node) || client.ready(node, startTime) || awaitReady(startTime)
   }
 
   /**