You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/03/24 21:12:12 UTC
[kafka] branch 1.0 updated: KAFKA-6052;
Fix WriteTxnMarkers request retry issue in InterBrokerSendThread
(#4705)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.0 by this push:
new a6c8259 KAFKA-6052; Fix WriteTxnMarkers request retry issue in InterBrokerSendThread (#4705)
a6c8259 is described below
commit a6c8259329eb76edea5f306178e1e5adc0f20f04
Author: Vahid Hashemian <va...@us.ibm.com>
AuthorDate: Sat Mar 24 14:04:08 2018 -0700
KAFKA-6052; Fix WriteTxnMarkers request retry issue in InterBrokerSendThread (#4705)
This resolves the issue found when running the brokers on Windows which prevents the coordinator from sending WriteTxnMarkers requests to complete a transaction.
---
.../scala/kafka/common/InterBrokerSendThread.scala | 158 +++++++++++++++++----
.../TransactionMarkerChannelManager.scala | 2 +
.../kafka/common/InterBrokerSendThreadTest.scala | 70 ++++++++-
3 files changed, 193 insertions(+), 37 deletions(-)
diff --git a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
index 70dae35..c65e557 100644
--- a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
+++ b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
@@ -16,13 +16,17 @@
*/
package kafka.common
+import java.util.{ArrayDeque, ArrayList, Collection, Collections, HashMap, Iterator}
+import java.util.Map.Entry
+
import kafka.utils.ShutdownableThread
-import org.apache.kafka.clients.{ClientResponse, NetworkClient, RequestCompletionHandler}
+import org.apache.kafka.clients.{ClientRequest, ClientResponse, NetworkClient, RequestCompletionHandler}
import org.apache.kafka.common.Node
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.requests.AbstractRequest
import org.apache.kafka.common.utils.Time
+import scala.collection.JavaConverters._
/**
* Class for inter-broker send thread that utilize a non-blocking network client.
@@ -34,6 +38,10 @@ abstract class InterBrokerSendThread(name: String,
extends ShutdownableThread(name, isInterruptible) {
def generateRequests(): Iterable[RequestAndCompletionHandler]
+ def unsentExpiryMs: Int
+ private val unsentRequests = new UnsentRequests
+
+ def hasUnsentRequests = unsentRequests.iterator().hasNext
override def shutdown(): Unit = {
initiateShutdown()
@@ -43,35 +51,21 @@ abstract class InterBrokerSendThread(name: String,
}
override def doWork() {
- val now = time.milliseconds()
- var pollTimeout = Long.MaxValue
+ var now = time.milliseconds()
+
+ generateRequests().foreach { request =>
+ val completionHandler = request.handler
+ unsentRequests.put(request.destination,
+ networkClient.newClientRequest(request.destination.idString, request.request, now, true, completionHandler))
+ }
try {
- for (request: RequestAndCompletionHandler <- generateRequests()) {
- val destination = Integer.toString(request.destination.id())
- val completionHandler = request.handler
- val clientRequest = networkClient.newClientRequest(destination,
- request.request,
- now,
- true,
- completionHandler)
-
- if (networkClient.ready(request.destination, now)) {
- networkClient.send(clientRequest, now)
- } else {
- val header = clientRequest.makeHeader(request.request.latestAllowedVersion)
- val disconnectResponse: ClientResponse = new ClientResponse(header, completionHandler, destination,
- now /* createdTimeMs */ , now /* receivedTimeMs */ , true /* disconnected */ , null /* versionMismatch */ ,
- null /* responseBody */)
-
- // poll timeout would be the minimum of connection delay if there are any dest yet to be reached;
- // otherwise it is infinity
- pollTimeout = Math.min(pollTimeout, networkClient.connectionDelay(request.destination, now))
-
- completionHandler.onComplete(disconnectResponse)
- }
- }
- networkClient.poll(pollTimeout, now)
+ val timeout = sendRequests(now)
+ networkClient.poll(timeout, now)
+ now = time.milliseconds()
+ checkDisconnects(now)
+ failExpiredRequests(now)
+ unsentRequests.clean()
} catch {
case e: FatalExitError => throw e
case t: Throwable =>
@@ -84,9 +78,113 @@ abstract class InterBrokerSendThread(name: String,
}
}
- def wakeup(): Unit = networkClient.wakeup()
+ private def sendRequests(now: Long): Long = {
+ var pollTimeout = Long.MaxValue
+ for (node <- unsentRequests.nodes.asScala) {
+ val requestIterator = unsentRequests.requestIterator(node)
+ while (requestIterator.hasNext) {
+ val request = requestIterator.next
+ if (networkClient.ready(node, now)) {
+ networkClient.send(request, now)
+ requestIterator.remove()
+ } else
+ pollTimeout = Math.min(pollTimeout, networkClient.connectionDelay(node, now))
+ }
+ }
+ pollTimeout
+ }
+
+ private def checkDisconnects(now: Long): Unit = {
+ // any disconnects affecting requests that have already been transmitted will be handled
+ // by NetworkClient, so we just need to check whether connections for any of the unsent
+ // requests have been disconnected; if they have, then we complete the corresponding future
+ // and set the disconnect flag in the ClientResponse
+ val iterator = unsentRequests.iterator()
+ while (iterator.hasNext) {
+ val entry = iterator.next
+ val (node, requests) = (entry.getKey, entry.getValue)
+ if (!requests.isEmpty && networkClient.connectionFailed(node)) {
+ iterator.remove()
+ for (request <- requests.asScala) {
+ if (networkClient.authenticationException(node) != null)
+ error(s"Failed to send the following request due to authentication error: $request")
+ completeWithDisconnect(request, now)
+ }
+ }
+ }
+ }
+ private def failExpiredRequests(now: Long): Unit = {
+ // clear all expired unsent requests
+ val expiredRequests = unsentRequests.removeExpiredRequests(now, unsentExpiryMs)
+ for (request <- expiredRequests.asScala) {
+ debug(s"Failed to send the following request after $unsentExpiryMs ms: $request")
+ completeWithDisconnect(request, now)
+ }
+ }
+
+ def completeWithDisconnect(request: ClientRequest, now: Long): Unit = {
+ val handler = request.callback
+ handler.onComplete(new ClientResponse(request.makeHeader(request.requestBuilder().latestAllowedVersion()),
+ handler, request.destination, now /* createdTimeMs */ , now /* receivedTimeMs */ , true /* disconnected */ ,
+ null /* versionMismatch */ , null /* responseBody */))
+ }
+
+ def wakeup(): Unit = networkClient.wakeup()
}
case class RequestAndCompletionHandler(destination: Node, request: AbstractRequest.Builder[_ <: AbstractRequest],
- handler: RequestCompletionHandler)
\ No newline at end of file
+ handler: RequestCompletionHandler)
+
+private class UnsentRequests {
+ private val unsent = new HashMap[Node, ArrayDeque[ClientRequest]]
+
+ def put(node: Node, request: ClientRequest): Unit = {
+ var requests = unsent.get(node)
+ if (requests == null) {
+ requests = new ArrayDeque[ClientRequest]
+ unsent.put(node, requests)
+ }
+ requests.add(request)
+ }
+
+ def removeExpiredRequests(now: Long, unsentExpiryMs: Long): Collection[ClientRequest] = {
+ val expiredRequests = new ArrayList[ClientRequest]
+ for (requests <- unsent.values.asScala) {
+ val requestIterator = requests.iterator
+ var foundExpiredRequest = false
+ while (requestIterator.hasNext && !foundExpiredRequest) {
+ val request = requestIterator.next
+ if (request.createdTimeMs < now - unsentExpiryMs) {
+ expiredRequests.add(request)
+ requestIterator.remove()
+ foundExpiredRequest = true
+ }
+ }
+ }
+ expiredRequests
+ }
+
+ def clean(): Unit = {
+ val iterator = unsent.values.iterator
+ while (iterator.hasNext) {
+ val requests = iterator.next
+ if (requests.isEmpty)
+ iterator.remove()
+ }
+ }
+
+ def iterator(): Iterator[Entry[Node, ArrayDeque[ClientRequest]]] = {
+ unsent.entrySet().iterator()
+ }
+
+ def requestIterator(node: Node): Iterator[ClientRequest] = {
+ val requests = unsent.get(node)
+ if (requests == null)
+ Collections.emptyIterator[ClientRequest]
+ else
+ requests.iterator
+ }
+
+ def nodes = unsent.keySet
+}
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index fa9d2c3..7059ced 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -135,6 +135,8 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
private val txnLogAppendRetryQueue = new LinkedBlockingQueue[TxnLogAppend]()
+ override val unsentExpiryMs: Int = config.requestTimeoutMs
+
newGauge(
"UnknownDestinationQueueSize",
new Gauge[Int] {
diff --git a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
index c6ebdd1..7106866 100644
--- a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
+++ b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
@@ -16,14 +16,15 @@
*/
package kafka.common
-import org.junit.{Assert, Test}
import kafka.utils.MockTime
import org.apache.kafka.clients.{ClientRequest, ClientResponse, NetworkClient, RequestCompletionHandler}
import org.apache.kafka.common.Node
+import org.apache.kafka.common.errors.AuthenticationException
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.AbstractRequest
import org.apache.kafka.common.utils.Utils
import org.easymock.EasyMock
+import org.junit.{Assert, Test}
import scala.collection.mutable
@@ -35,18 +36,20 @@ class InterBrokerSendThreadTest {
@Test
def shouldNotSendAnythingWhenNoRequests(): Unit = {
val sendThread = new InterBrokerSendThread("name", networkClient, time) {
+ override val unsentExpiryMs: Int = 1000
override def generateRequests() = mutable.Iterable.empty
}
// poll is always called but there should be no further invocations on NetworkClient
EasyMock.expect(networkClient.poll(EasyMock.anyLong(), EasyMock.anyLong()))
- .andReturn(Utils.mkList())
+ .andReturn(Utils.mkList())
EasyMock.replay(networkClient)
sendThread.doWork()
EasyMock.verify(networkClient)
+ Assert.assertFalse(completionHandler.executedWithDisconnectedResponse)
}
@Test
@@ -55,6 +58,7 @@ class InterBrokerSendThreadTest {
val node = new Node(1, "", 8080)
val handler = RequestAndCompletionHandler(node, request, completionHandler)
val sendThread = new InterBrokerSendThread("name", networkClient, time) {
+ override val unsentExpiryMs: Int = 1000
override def generateRequests() = List[RequestAndCompletionHandler](handler)
}
@@ -65,10 +69,10 @@ class InterBrokerSendThreadTest {
EasyMock.anyLong(),
EasyMock.eq(true),
EasyMock.same(handler.handler)))
- .andReturn(clientRequest)
+ .andReturn(clientRequest)
EasyMock.expect(networkClient.ready(node, time.milliseconds()))
- .andReturn(true)
+ .andReturn(true)
EasyMock.expect(networkClient.send(clientRequest, time.milliseconds()))
@@ -80,15 +84,16 @@ class InterBrokerSendThreadTest {
sendThread.doWork()
EasyMock.verify(networkClient)
+ Assert.assertFalse(completionHandler.executedWithDisconnectedResponse)
}
-
@Test
def shouldCallCompletionHandlerWithDisconnectedResponseWhenNodeNotReady(): Unit = {
val request = new StubRequestBuilder
val node = new Node(1, "", 8080)
val requestAndCompletionHandler = RequestAndCompletionHandler(node, request, completionHandler)
val sendThread = new InterBrokerSendThread("name", networkClient, time) {
+ override val unsentExpiryMs: Int = 1000
override def generateRequests() = List[RequestAndCompletionHandler](requestAndCompletionHandler)
}
@@ -105,17 +110,66 @@ class InterBrokerSendThreadTest {
.andReturn(false)
EasyMock.expect(networkClient.connectionDelay(EasyMock.anyObject(), EasyMock.anyLong()))
- .andReturn(0)
+ .andReturn(0)
EasyMock.expect(networkClient.poll(EasyMock.anyLong(), EasyMock.anyLong()))
.andReturn(Utils.mkList())
+ EasyMock.expect(networkClient.connectionFailed(node))
+ .andReturn(true)
+
+ EasyMock.expect(networkClient.authenticationException(node))
+ .andReturn(new AuthenticationException(""))
+
+ EasyMock.replay(networkClient)
+
+ sendThread.doWork()
+
+ EasyMock.verify(networkClient)
+ Assert.assertTrue(completionHandler.executedWithDisconnectedResponse)
+ }
+
+ @Test
+ def testFailingExpiredRequests(): Unit = {
+ val request = new StubRequestBuilder()
+ val node = new Node(1, "", 8080)
+ val handler = RequestAndCompletionHandler(node, request, completionHandler)
+ val sendThread = new InterBrokerSendThread("name", networkClient, time) {
+ override val unsentExpiryMs: Int = 1000
+ override def generateRequests() = List[RequestAndCompletionHandler](handler)
+ }
+
+ val clientRequest = new ClientRequest("dest", request, 0, "1", time.milliseconds(), true, handler.handler)
+ time.sleep(1500)
+
+ EasyMock.expect(networkClient.newClientRequest(EasyMock.eq("1"),
+ EasyMock.same(handler.request),
+ EasyMock.eq(time.milliseconds()),
+ EasyMock.eq(true),
+ EasyMock.same(handler.handler)))
+ .andReturn(clientRequest)
+
+ // make the node unready so the request is not cleared
+ EasyMock.expect(networkClient.ready(node, time.milliseconds()))
+ .andReturn(false)
+
+ EasyMock.expect(networkClient.connectionDelay(EasyMock.anyObject(), EasyMock.anyLong()))
+ .andReturn(0)
+
+ EasyMock.expect(networkClient.poll(EasyMock.anyLong(), EasyMock.anyLong()))
+ .andReturn(Utils.mkList())
+
+ // rule out disconnects so the request stays for the expiry check
+ EasyMock.expect(networkClient.connectionFailed(node))
+ .andReturn(false)
+
EasyMock.replay(networkClient)
sendThread.doWork()
EasyMock.verify(networkClient)
- Assert.assertTrue(completionHandler.response.wasDisconnected())
+ Assert.assertFalse(sendThread.hasUnsentRequests)
+ Assert.assertTrue(completionHandler.executedWithDisconnectedResponse)
}
@@ -124,8 +178,10 @@ class InterBrokerSendThreadTest {
}
private class StubCompletionHandler extends RequestCompletionHandler {
+ var executedWithDisconnectedResponse = false
var response: ClientResponse = _
override def onComplete(response: ClientResponse): Unit = {
+ this.executedWithDisconnectedResponse = response.wasDisconnected()
this.response = response
}
}
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.