You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/02/12 21:30:22 UTC

git commit: Processor thread blocks due to infinite loop during fetch response send; patched by Sriram Subramanian; reviewed by Jun Rao; kafka-756

Updated Branches:
  refs/heads/0.8 879d2692e -> 48745f04d


Processor thread blocks due to infinite loop during fetch response send; patched by Sriram Subramanian; reviewed by Jun Rao; kafka-756


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/48745f04
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/48745f04
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/48745f04

Branch: refs/heads/0.8
Commit: 48745f04d12988121cb770cbd33ed3ed6fe98cff
Parents: 879d269
Author: Jun Rao <ju...@gmail.com>
Authored: Tue Feb 12 12:29:59 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Feb 12 12:29:59 2013 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/api/FetchResponse.scala  |    4 +-
 core/src/main/scala/kafka/log/FileMessageSet.scala |   16 +++++++++--
 .../main/scala/kafka/network/SocketServer.scala    |    2 +-
 .../main/scala/kafka/network/Transmission.scala    |   22 +++++++++++---
 4 files changed, 33 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/48745f04/core/src/main/scala/kafka/api/FetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala
index 94650f1..e528742 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -125,7 +125,7 @@ class TopicDataSend(val topicData: TopicData) extends Send {
     if(buffer.hasRemaining)
       written += channel.write(buffer)
     if(!buffer.hasRemaining && !sends.complete) {
-      written += sends.writeCompletely(channel)
+      written += sends.writeTo(channel)
     }
     sent += written
     written
@@ -220,7 +220,7 @@ class FetchResponseSend(val fetchResponse: FetchResponse) extends Send {
     if(buffer.hasRemaining)
       written += channel.write(buffer)
     if(!buffer.hasRemaining && !sends.complete) {
-      written += sends.writeCompletely(channel)
+      written += sends.writeTo(channel)
     }
     sent += written
     written

http://git-wip-us.apache.org/repos/asf/kafka/blob/48745f04/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index 5845bb6..ce27a19 100644
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -71,7 +71,7 @@ class FileMessageSet private[kafka](val file: File,
   }
   
   /**
-   * Search forward for the file position of the last offset that is great than or equal to the target offset 
+   * Search forward for the file position of the last offset that is greater than or equal to the target offset
    * and return its physical position. If no such offsets are found, return null.
    */
   private[log] def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
@@ -97,8 +97,18 @@ class FileMessageSet private[kafka](val file: File,
   /**
    * Write some of this set to the given channel, return the amount written
    */
-  def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int =
-    channel.transferTo(start + writePosition, scala.math.min(size, sizeInBytes), destChannel).toInt
+  def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = {
+    // Ensure that the underlying size has not changed.
+    val newSize = scala.math.min(channel.size().toInt, limit) - start
+    if (newSize < _size.get()) {
+      throw new KafkaException("Size of FileMessageSet %s has been truncated during write: old size %d, new size %d"
+        .format(file.getAbsolutePath, _size.get(), newSize))
+    }
+    val bytesTransferred = channel.transferTo(start + writePosition, scala.math.min(size, sizeInBytes), destChannel).toInt
+    trace("FileMessageSet " + file.getAbsolutePath + " : bytes transferred : " + bytesTransferred
+      + " bytes requested for transfer : " + scala.math.min(size, sizeInBytes))
+    bytesTransferred
+  }
   
   /**
    * Get an iterator over the messages in the set. We only do shallow iteration here.

http://git-wip-us.apache.org/repos/asf/kafka/blob/48745f04/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 8f0053a..d5a24f6 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -340,7 +340,7 @@ private[kafka] class Processor(val id: Int,
     if(responseSend == null)
       throw new IllegalStateException("Registered for write interest but no response attached to key.")
     val written = responseSend.writeTo(socketChannel)
-    trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress())
+    trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress() + " using key " + key)
     if(responseSend.complete) {
       response.request.updateRequestMetrics()
       key.attach(null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/48745f04/core/src/main/scala/kafka/network/Transmission.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/Transmission.scala b/core/src/main/scala/kafka/network/Transmission.scala
index f87e9d0..2827103 100644
--- a/core/src/main/scala/kafka/network/Transmission.scala
+++ b/core/src/main/scala/kafka/network/Transmission.scala
@@ -89,13 +89,25 @@ abstract class MultiSend[S <: Send](val sends: List[S]) extends Send {
   private var current = sends
   var totalWritten = 0
 
+  /**
+   *  This method continues to write to the socket buffer till an incomplete
+   *  write happens. On an incomplete write, it returns to the caller to give it
+   *  a chance to schedule other work till the buffered write completes.
+   */
   def writeTo(channel: GatheringByteChannel): Int = {
     expectIncomplete
-    val written = current.head.writeTo(channel)
-    totalWritten += written
-    if(current.head.complete)
-      current = current.tail
-    written
+    var totalWrittenPerCall = 0
+    var sendComplete: Boolean = false
+    do {
+      val written = current.head.writeTo(channel)
+      totalWritten += written
+      totalWrittenPerCall += written
+      sendComplete = current.head.complete
+      if(sendComplete)
+        current = current.tail
+    } while (!complete && sendComplete)
+    trace("Bytes written as part of multisend call : " + totalWrittenPerCall +  "Total bytes written so far : " + totalWritten + "Expected bytes to write : " + expectedBytesToWrite)
+    totalWrittenPerCall
   }
   
   def complete: Boolean = {