You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/03/24 21:10:05 UTC

[kafka] branch 1.1 updated: KAFKA-6052; Fix WriteTxnMarkers request retry issue in InterBrokerSendThread (#4705)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 89b2df9  KAFKA-6052; Fix WriteTxnMarkers request retry issue in InterBrokerSendThread (#4705)
89b2df9 is described below

commit 89b2df944556b7b1375ad2eac3b9be6aea5cb51b
Author: Vahid Hashemian <va...@us.ibm.com>
AuthorDate: Sat Mar 24 14:04:08 2018 -0700

    KAFKA-6052; Fix WriteTxnMarkers request retry issue in InterBrokerSendThread (#4705)
    
    This resolves the issue found when running the brokers on Windows which prevents the coordinator from sending WriteTxnMarkers requests to complete a transaction.
---
 .../scala/kafka/common/InterBrokerSendThread.scala | 158 +++++++++++++++++----
 .../TransactionMarkerChannelManager.scala          |   2 +
 .../kafka/common/InterBrokerSendThreadTest.scala   |  70 ++++++++-
 3 files changed, 193 insertions(+), 37 deletions(-)

diff --git a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
index 70dae35..c65e557 100644
--- a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
+++ b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
@@ -16,13 +16,17 @@
  */
 package kafka.common
 
+import java.util.{ArrayDeque, ArrayList, Collection, Collections, HashMap, Iterator}
+import java.util.Map.Entry
+
 import kafka.utils.ShutdownableThread
-import org.apache.kafka.clients.{ClientResponse, NetworkClient, RequestCompletionHandler}
+import org.apache.kafka.clients.{ClientRequest, ClientResponse, NetworkClient, RequestCompletionHandler}
 import org.apache.kafka.common.Node
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.requests.AbstractRequest
 import org.apache.kafka.common.utils.Time
 
+import scala.collection.JavaConverters._
 
 /**
  *  Class for inter-broker send thread that utilize a non-blocking network client.
@@ -34,6 +38,10 @@ abstract class InterBrokerSendThread(name: String,
   extends ShutdownableThread(name, isInterruptible) {
 
   def generateRequests(): Iterable[RequestAndCompletionHandler]
+  def unsentExpiryMs: Int
+  private val unsentRequests = new UnsentRequests
+
+  def hasUnsentRequests = unsentRequests.iterator().hasNext
 
   override def shutdown(): Unit = {
     initiateShutdown()
@@ -43,35 +51,21 @@ abstract class InterBrokerSendThread(name: String,
   }
 
   override def doWork() {
-    val now = time.milliseconds()
-    var pollTimeout = Long.MaxValue
+    var now = time.milliseconds()
+
+    generateRequests().foreach { request =>
+      val completionHandler = request.handler
+      unsentRequests.put(request.destination,
+        networkClient.newClientRequest(request.destination.idString, request.request, now, true, completionHandler))
+    }
 
     try {
-      for (request: RequestAndCompletionHandler <- generateRequests()) {
-        val destination = Integer.toString(request.destination.id())
-        val completionHandler = request.handler
-        val clientRequest = networkClient.newClientRequest(destination,
-          request.request,
-          now,
-          true,
-          completionHandler)
-
-        if (networkClient.ready(request.destination, now)) {
-          networkClient.send(clientRequest, now)
-        } else {
-          val header = clientRequest.makeHeader(request.request.latestAllowedVersion)
-          val disconnectResponse: ClientResponse = new ClientResponse(header, completionHandler, destination,
-            now /* createdTimeMs */ , now /* receivedTimeMs */ , true /* disconnected */ , null /* versionMismatch */ ,
-            null /* responseBody */)
-
-          // poll timeout would be the minimum of connection delay if there are any dest yet to be reached;
-          // otherwise it is infinity
-          pollTimeout = Math.min(pollTimeout, networkClient.connectionDelay(request.destination, now))
-
-          completionHandler.onComplete(disconnectResponse)
-        }
-      }
-      networkClient.poll(pollTimeout, now)
+      val timeout = sendRequests(now)
+      networkClient.poll(timeout, now)
+      now = time.milliseconds()
+      checkDisconnects(now)
+      failExpiredRequests(now)
+      unsentRequests.clean()
     } catch {
       case e: FatalExitError => throw e
       case t: Throwable =>
@@ -84,9 +78,113 @@ abstract class InterBrokerSendThread(name: String,
     }
   }
 
-  def wakeup(): Unit = networkClient.wakeup()
+  private def sendRequests(now: Long): Long = {
+    var pollTimeout = Long.MaxValue
+    for (node <- unsentRequests.nodes.asScala) {
+      val requestIterator = unsentRequests.requestIterator(node)
+      while (requestIterator.hasNext) {
+        val request = requestIterator.next
+        if (networkClient.ready(node, now)) {
+          networkClient.send(request, now)
+          requestIterator.remove()
+        } else
+          pollTimeout = Math.min(pollTimeout, networkClient.connectionDelay(node, now))
+      }
+    }
+    pollTimeout
+  }
+
+  private def checkDisconnects(now: Long): Unit = {
+    // any disconnects affecting requests that have already been transmitted will be handled
+    // by NetworkClient, so we just need to check whether connections for any of the unsent
+    // requests have been disconnected; if they have, then we complete the corresponding future
+    // and set the disconnect flag in the ClientResponse
+    val iterator = unsentRequests.iterator()
+    while (iterator.hasNext) {
+      val entry = iterator.next
+      val (node, requests) = (entry.getKey, entry.getValue)
+      if (!requests.isEmpty && networkClient.connectionFailed(node)) {
+        iterator.remove()
+        for (request <- requests.asScala) {
+          if (networkClient.authenticationException(node) != null)
+            error(s"Failed to send the following request due to authentication error: $request")
+          completeWithDisconnect(request, now)
+        }
+      }
+    }
+  }
 
+  private def failExpiredRequests(now: Long): Unit = {
+    // clear all expired unsent requests
+    val expiredRequests = unsentRequests.removeExpiredRequests(now, unsentExpiryMs)
+    for (request <- expiredRequests.asScala) {
+      debug(s"Failed to send the following request after $unsentExpiryMs ms: $request")
+      completeWithDisconnect(request, now)
+    }
+  }
+
+  def completeWithDisconnect(request: ClientRequest, now: Long): Unit = {
+    val handler = request.callback
+    handler.onComplete(new ClientResponse(request.makeHeader(request.requestBuilder().latestAllowedVersion()),
+      handler, request.destination, now /* createdTimeMs */ , now /* receivedTimeMs */ , true /* disconnected */ ,
+      null /* versionMismatch */ , null /* responseBody */))
+  }
+
+  def wakeup(): Unit = networkClient.wakeup()
 }
 
 case class RequestAndCompletionHandler(destination: Node, request: AbstractRequest.Builder[_ <: AbstractRequest],
-                                       handler: RequestCompletionHandler)
\ No newline at end of file
+                                       handler: RequestCompletionHandler)
+
+private class UnsentRequests {
+  private val unsent = new HashMap[Node, ArrayDeque[ClientRequest]]
+
+  def put(node: Node, request: ClientRequest): Unit = {
+    var requests = unsent.get(node)
+    if (requests == null) {
+      requests = new ArrayDeque[ClientRequest]
+      unsent.put(node, requests)
+    }
+    requests.add(request)
+  }
+
+  def removeExpiredRequests(now: Long, unsentExpiryMs: Long): Collection[ClientRequest] = {
+    val expiredRequests = new ArrayList[ClientRequest]
+    for (requests <- unsent.values.asScala) {
+      val requestIterator = requests.iterator
+      var foundExpiredRequest = false
+      while (requestIterator.hasNext && !foundExpiredRequest) {
+        val request = requestIterator.next
+        if (request.createdTimeMs < now - unsentExpiryMs) {
+          expiredRequests.add(request)
+          requestIterator.remove()
+          foundExpiredRequest = true
+        }
+      }
+    }
+    expiredRequests
+  }
+
+  def clean(): Unit = {
+    val iterator = unsent.values.iterator
+    while (iterator.hasNext) {
+      val requests = iterator.next
+      if (requests.isEmpty)
+        iterator.remove()
+    }
+  }
+
+  def iterator(): Iterator[Entry[Node, ArrayDeque[ClientRequest]]] = {
+    unsent.entrySet().iterator()
+  }
+
+  def requestIterator(node: Node): Iterator[ClientRequest] = {
+    val requests = unsent.get(node)
+    if (requests == null)
+      Collections.emptyIterator[ClientRequest]
+    else
+      requests.iterator
+  }
+
+  def nodes = unsent.keySet
+}
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index fa9d2c3..7059ced 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -135,6 +135,8 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
 
   private val txnLogAppendRetryQueue = new LinkedBlockingQueue[TxnLogAppend]()
 
+  override val unsentExpiryMs: Int = config.requestTimeoutMs
+
   newGauge(
     "UnknownDestinationQueueSize",
     new Gauge[Int] {
diff --git a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
index c6ebdd1..7106866 100644
--- a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
+++ b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
@@ -16,14 +16,15 @@
  */
 package kafka.common
 
-import org.junit.{Assert, Test}
 import kafka.utils.MockTime
 import org.apache.kafka.clients.{ClientRequest, ClientResponse, NetworkClient, RequestCompletionHandler}
 import org.apache.kafka.common.Node
+import org.apache.kafka.common.errors.AuthenticationException
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.AbstractRequest
 import org.apache.kafka.common.utils.Utils
 import org.easymock.EasyMock
+import org.junit.{Assert, Test}
 
 import scala.collection.mutable
 
@@ -35,18 +36,20 @@ class InterBrokerSendThreadTest {
   @Test
   def shouldNotSendAnythingWhenNoRequests(): Unit = {
     val sendThread = new InterBrokerSendThread("name", networkClient, time) {
+      override val unsentExpiryMs: Int = 1000
       override def generateRequests() = mutable.Iterable.empty
     }
 
     // poll is always called but there should be no further invocations on NetworkClient
     EasyMock.expect(networkClient.poll(EasyMock.anyLong(), EasyMock.anyLong()))
-    .andReturn(Utils.mkList())
+      .andReturn(Utils.mkList())
 
     EasyMock.replay(networkClient)
 
     sendThread.doWork()
 
     EasyMock.verify(networkClient)
+    Assert.assertFalse(completionHandler.executedWithDisconnectedResponse)
   }
 
   @Test
@@ -55,6 +58,7 @@ class InterBrokerSendThreadTest {
     val node = new Node(1, "", 8080)
     val handler = RequestAndCompletionHandler(node, request, completionHandler)
     val sendThread = new InterBrokerSendThread("name", networkClient, time) {
+      override val unsentExpiryMs: Int = 1000
       override def generateRequests() = List[RequestAndCompletionHandler](handler)
     }
 
@@ -65,10 +69,10 @@ class InterBrokerSendThreadTest {
       EasyMock.anyLong(),
       EasyMock.eq(true),
       EasyMock.same(handler.handler)))
-    .andReturn(clientRequest)
+      .andReturn(clientRequest)
 
     EasyMock.expect(networkClient.ready(node, time.milliseconds()))
-    .andReturn(true)
+      .andReturn(true)
 
     EasyMock.expect(networkClient.send(clientRequest, time.milliseconds()))
 
@@ -80,15 +84,16 @@ class InterBrokerSendThreadTest {
     sendThread.doWork()
 
     EasyMock.verify(networkClient)
+    Assert.assertFalse(completionHandler.executedWithDisconnectedResponse)
   }
 
-
   @Test
   def shouldCallCompletionHandlerWithDisconnectedResponseWhenNodeNotReady(): Unit = {
     val request = new StubRequestBuilder
     val node = new Node(1, "", 8080)
     val requestAndCompletionHandler = RequestAndCompletionHandler(node, request, completionHandler)
     val sendThread = new InterBrokerSendThread("name", networkClient, time) {
+      override val unsentExpiryMs: Int = 1000
       override def generateRequests() = List[RequestAndCompletionHandler](requestAndCompletionHandler)
     }
 
@@ -105,17 +110,66 @@ class InterBrokerSendThreadTest {
       .andReturn(false)
 
     EasyMock.expect(networkClient.connectionDelay(EasyMock.anyObject(), EasyMock.anyLong()))
-    .andReturn(0)
+      .andReturn(0)
 
     EasyMock.expect(networkClient.poll(EasyMock.anyLong(), EasyMock.anyLong()))
       .andReturn(Utils.mkList())
 
+    EasyMock.expect(networkClient.connectionFailed(node))
+      .andReturn(true)
+
+    EasyMock.expect(networkClient.authenticationException(node))
+      .andReturn(new AuthenticationException(""))
+
+    EasyMock.replay(networkClient)
+
+    sendThread.doWork()
+
+    EasyMock.verify(networkClient)
+    Assert.assertTrue(completionHandler.executedWithDisconnectedResponse)
+  }
+
+  @Test
+  def testFailingExpiredRequests(): Unit = {
+    val request = new StubRequestBuilder()
+    val node = new Node(1, "", 8080)
+    val handler = RequestAndCompletionHandler(node, request, completionHandler)
+    val sendThread = new InterBrokerSendThread("name", networkClient, time) {
+      override val unsentExpiryMs: Int = 1000
+      override def generateRequests() = List[RequestAndCompletionHandler](handler)
+    }
+
+    val clientRequest = new ClientRequest("dest", request, 0, "1", time.milliseconds(), true, handler.handler)
+    time.sleep(1500)
+
+    EasyMock.expect(networkClient.newClientRequest(EasyMock.eq("1"),
+      EasyMock.same(handler.request),
+      EasyMock.eq(time.milliseconds()),
+      EasyMock.eq(true),
+      EasyMock.same(handler.handler)))
+      .andReturn(clientRequest)
+
+    // make the node unready so the request is not cleared
+    EasyMock.expect(networkClient.ready(node, time.milliseconds()))
+      .andReturn(false)
+
+    EasyMock.expect(networkClient.connectionDelay(EasyMock.anyObject(), EasyMock.anyLong()))
+      .andReturn(0)
+
+    EasyMock.expect(networkClient.poll(EasyMock.anyLong(), EasyMock.anyLong()))
+      .andReturn(Utils.mkList())
+
+    // rule out disconnects so the request stays for the expiry check
+    EasyMock.expect(networkClient.connectionFailed(node))
+      .andReturn(false)
+
     EasyMock.replay(networkClient)
 
     sendThread.doWork()
 
     EasyMock.verify(networkClient)
-    Assert.assertTrue(completionHandler.response.wasDisconnected())
+    Assert.assertFalse(sendThread.hasUnsentRequests)
+    Assert.assertTrue(completionHandler.executedWithDisconnectedResponse)
   }
 
 
@@ -124,8 +178,10 @@ class InterBrokerSendThreadTest {
   }
 
   private class StubCompletionHandler extends RequestCompletionHandler {
+    var executedWithDisconnectedResponse = false
     var response: ClientResponse = _
     override def onComplete(response: ClientResponse): Unit = {
+      this.executedWithDisconnectedResponse = response.wasDisconnected()
       this.response = response
     }
   }

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.