You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/12 01:09:36 UTC
[20/50] [abbrv] kafka git commit: KAFKA-3495;
NetworkClient.blockingSendAndReceive` should rely on requestTimeout
KAFKA-3495; NetworkClient.blockingSendAndReceive` should rely on requestTimeout
Also removed the code for handling negative timeouts in `blockingReady` as `Selector.poll` has not supported that for a while.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jun Rao <ju...@gmail.com>
Closes #1177 from ijuma/kafka-3495-blocking-send-and-receive-request-timeout
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b94a7812
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b94a7812
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b94a7812
Branch: refs/heads/0.10.0
Commit: b94a7812beeb840926871df84838f35ca7b76ffe
Parents: ef3f053
Author: Ismael Juma <is...@juma.me.uk>
Authored: Sun Apr 3 16:34:46 2016 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Sun Apr 3 16:34:46 2016 -0700
----------------------------------------------------------------------
.../org/apache/kafka/clients/NetworkClient.java | 2 +-
.../controller/ControllerChannelManager.scala | 4 +-
.../main/scala/kafka/server/KafkaServer.scala | 9 +--
.../kafka/server/ReplicaFetcherThread.scala | 4 +-
.../kafka/utils/NetworkClientBlockingOps.scala | 66 +++++++++++---------
5 files changed, 40 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b94a7812/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 4d01cde..d22b508 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -390,7 +390,7 @@ public class NetworkClient implements KafkaClient {
}
/**
- * Iterate over all the inflight requests and expire any requests that have exceeded the configured the requestTimeout.
+ * Iterate over all the inflight requests and expire any requests that have exceeded the configured requestTimeout.
* The connection to the node associated with the request will be terminated and will be treated as a disconnection.
*
* @param responses The list of responses to update
http://git-wip-us.apache.org/repos/asf/kafka/blob/b94a7812/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index b376d15..e9731fd 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -178,9 +178,7 @@ class RequestSendThread(val controllerId: Int,
val requestHeader = apiVersion.fold(networkClient.nextRequestHeader(apiKey))(networkClient.nextRequestHeader(apiKey, _))
val send = new RequestSend(brokerNode.idString, requestHeader, request.toStruct)
val clientRequest = new ClientRequest(time.milliseconds(), true, send, null)
- clientResponse = networkClient.blockingSendAndReceive(clientRequest, socketTimeoutMs)(time).getOrElse {
- throw new SocketTimeoutException(s"No response received within $socketTimeoutMs ms")
- }
+ clientResponse = networkClient.blockingSendAndReceive(clientRequest)(time)
isSendSuccessful = true
}
} catch {
http://git-wip-us.apache.org/repos/asf/kafka/blob/b94a7812/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index e29494b..f998d82 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -320,9 +320,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
val socketTimeoutMs = config.controllerSocketTimeoutMs
- def socketTimeoutException: Throwable =
- new SocketTimeoutException(s"Did not receive response within $socketTimeoutMs")
-
def networkClientControlledShutdown(retries: Int): Boolean = {
val metadataUpdater = new ManualMetadataUpdater()
val networkClient = {
@@ -388,16 +385,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
try {
if (!networkClient.blockingReady(node(prevController), socketTimeoutMs))
- throw socketTimeoutException
+ throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms")
// send the controlled shutdown request
val requestHeader = networkClient.nextRequestHeader(ApiKeys.CONTROLLED_SHUTDOWN_KEY)
val send = new RequestSend(node(prevController).idString, requestHeader,
new ControlledShutdownRequest(config.brokerId).toStruct)
val request = new ClientRequest(kafkaMetricsTime.milliseconds(), true, send, null)
- val clientResponse = networkClient.blockingSendAndReceive(request, socketTimeoutMs).getOrElse {
- throw socketTimeoutException
- }
+ val clientResponse = networkClient.blockingSendAndReceive(request)
val shutdownResponse = new ControlledShutdownResponse(clientResponse.responseBody)
if (shutdownResponse.errorCode == Errors.NONE.code && shutdownResponse.partitionsRemaining.isEmpty) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/b94a7812/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index de7269f..26838ca 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -233,9 +233,7 @@ class ReplicaFetcherThread(name: String,
else {
val send = new RequestSend(sourceBroker.id.toString, header, request.toStruct)
val clientRequest = new ClientRequest(time.milliseconds(), true, send, null)
- networkClient.blockingSendAndReceive(clientRequest, socketTimeout)(time).getOrElse {
- throw new SocketTimeoutException(s"No response received within $socketTimeout ms")
- }
+ networkClient.blockingSendAndReceive(clientRequest)(time)
}
}
catch {
http://git-wip-us.apache.org/repos/asf/kafka/blob/b94a7812/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
index 9ed9d29..fd4af6e 100644
--- a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
+++ b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
@@ -55,6 +55,7 @@ class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal {
* care.
*/
def blockingReady(node: Node, timeout: Long)(implicit time: JTime): Boolean = {
+ require(timeout >=0, "timeout should be >= 0")
client.ready(node, time.milliseconds()) || pollUntil(timeout) { (_, now) =>
if (client.isReady(node, now))
true
@@ -65,19 +66,18 @@ class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal {
}
/**
- * Invokes `client.send` followed by 1 or more `client.poll` invocations until a response is received,
- * the timeout expires or a disconnection happens.
+ * Invokes `client.send` followed by 1 or more `client.poll` invocations until a response is received or a
+ * disconnection happens (which can happen for a number of reasons including a request timeout).
*
- * It returns `true` if the call completes normally or `false` if the timeout expires. In the case of a disconnection,
- * an `IOException` is thrown instead.
+ * In case of a disconnection, an `IOException` is thrown.
*
* This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
* care.
*/
- def blockingSendAndReceive(request: ClientRequest, timeout: Long)(implicit time: JTime): Option[ClientResponse] = {
+ def blockingSendAndReceive(request: ClientRequest)(implicit time: JTime): ClientResponse = {
client.send(request, time.milliseconds())
- pollUntilFound(timeout) { case (responses, _) =>
+ pollContinuously { responses =>
val response = responses.find { response =>
response.request.request.header.correlationId == request.request.header.correlationId
}
@@ -102,41 +102,45 @@ class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal {
* care.
*/
private def pollUntil(timeout: Long)(predicate: (Seq[ClientResponse], Long) => Boolean)(implicit time: JTime): Boolean = {
- pollUntilFound(timeout) { (responses, now) =>
- if (predicate(responses, now)) Some(true)
- else None
- }.fold(false)(_ => true)
- }
-
- /**
- * Invokes `client.poll` until `collect` returns `Some` or the timeout expires.
- *
- * It returns the result of `collect` if the call completes normally or `None` if the timeout expires. Exceptions
- * thrown via `collect` are not handled and will bubble up.
- *
- * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
- * care.
- */
- private def pollUntilFound[T](timeout: Long)(collect: (Seq[ClientResponse], Long) => Option[T])(implicit time: JTime): Option[T] = {
-
val methodStartTime = time.milliseconds()
val timeoutExpiryTime = methodStartTime + timeout
@tailrec
- def recurse(iterationStartTime: Long): Option[T] = {
- val pollTimeout = if (timeout < 0) timeout else timeoutExpiryTime - iterationStartTime
+ def recursivePoll(iterationStartTime: Long): Boolean = {
+ val pollTimeout = timeoutExpiryTime - iterationStartTime
val responses = client.poll(pollTimeout, iterationStartTime).asScala
- val result = collect(responses, iterationStartTime)
- if (result.isDefined) result
+ if (predicate(responses, iterationStartTime)) true
else {
val afterPollTime = time.milliseconds()
- if (timeout < 0 || afterPollTime < timeoutExpiryTime)
- recurse(afterPollTime)
- else None
+ if (afterPollTime < timeoutExpiryTime) recursivePoll(afterPollTime)
+ else false
+ }
+ }
+
+ recursivePoll(methodStartTime)
+ }
+
+ /**
+ * Invokes `client.poll` until `collect` returns `Some`. The value inside `Some` is returned.
+ *
+ * Exceptions thrown via `collect` are not handled and will bubble up.
+ *
+ * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
+ * care.
+ */
+ private def pollContinuously[T](collect: Seq[ClientResponse] => Option[T])(implicit time: JTime): T = {
+
+ @tailrec
+ def recursivePoll: T = {
+ // rely on request timeout to ensure we don't block forever
+ val responses = client.poll(Long.MaxValue, time.milliseconds()).asScala
+ collect(responses) match {
+ case Some(result) => result
+ case None => recursivePoll
}
}
- recurse(methodStartTime)
+ recursivePoll
}
}