You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/08/25 23:27:19 UTC
kafka git commit: HOTFIX: Fix verbose logging in
ControllerChannelManager.brokerReady
Repository: kafka
Updated Branches:
refs/heads/trunk 4cdb96f4c -> 53651937f
HOTFIX: Fix verbose logging in ControllerChannelManager.brokerReady
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #1786 from hachikuji/hotfix-ctrlchannelmgr-verbose-logging
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/53651937
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/53651937
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/53651937
Branch: refs/heads/trunk
Commit: 53651937fa8a7750daf44d7e88e616f01e2e37d3
Parents: 4cdb96f
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Aug 26 00:05:22 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Aug 26 00:05:22 2016 +0100
----------------------------------------------------------------------
.../controller/ControllerChannelManager.scala | 9 +++++----
.../kafka/utils/NetworkClientBlockingOps.scala | 18 ++++++++++++++----
2 files changed, 19 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/53651937/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index c46a536..03cd98c 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -226,12 +226,13 @@ class RequestSendThread(val controllerId: Int,
private def brokerReady(): Boolean = {
import NetworkClientBlockingOps._
try {
- val ready = networkClient.blockingReady(brokerNode, socketTimeoutMs)(time)
+ if (!networkClient.isReady(brokerNode)(time)) {
+ if (!networkClient.blockingReady(brokerNode, socketTimeoutMs)(time))
+ throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms")
- if (!ready)
- throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms")
+ info("Controller %d connected to %s for sending state change requests".format(controllerId, brokerNode.toString()))
+ }
- info("Controller %d connected to %s for sending state change requests".format(controllerId, brokerNode.toString()))
true
} catch {
case e: Throwable =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/53651937/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
index 9aca663..9b0828f 100644
--- a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
+++ b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
@@ -45,6 +45,19 @@ object NetworkClientBlockingOps {
class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal {
/**
+ * Checks whether the node is currently connected, first calling `client.poll` to ensure that any pending
+ * disconnects have been processed.
+ *
+ * This method can be used to check the status of a connection prior to calling `blockingReady` to be able
+ * to tell whether the latter completed a new connection.
+ */
+ def isReady(node: Node)(implicit time: JTime): Boolean = {
+ val currentTime = time.milliseconds()
+ client.poll(0, currentTime)
+ client.isReady(node, currentTime)
+ }
+
+ /**
* Invokes `client.poll` to discard pending disconnects, followed by `client.ready` and 0 or more `client.poll`
* invocations until the connection to `node` is ready, the timeout expires or the connection fails.
*
@@ -77,10 +90,7 @@ class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal {
}
}
- // poll once to receive pending disconnects
- client.poll(0, startTime)
-
- client.ready(node, startTime) || awaitReady(startTime)
+ isReady(node) || client.ready(node, startTime) || awaitReady(startTime)
}
/**