You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/02/12 21:30:22 UTC
git commit: Processor thread blocks due to infinite loop during fetch
response send; patched by Sriram Subramanian; reviewed by Jun Rao; kafka-756
Updated Branches:
refs/heads/0.8 879d2692e -> 48745f04d
Processor thread blocks due to infinite loop during fetch response send; patched by Sriram Subramanian; reviewed by Jun Rao; kafka-756
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/48745f04
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/48745f04
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/48745f04
Branch: refs/heads/0.8
Commit: 48745f04d12988121cb770cbd33ed3ed6fe98cff
Parents: 879d269
Author: Jun Rao <ju...@gmail.com>
Authored: Tue Feb 12 12:29:59 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Feb 12 12:29:59 2013 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/api/FetchResponse.scala | 4 +-
core/src/main/scala/kafka/log/FileMessageSet.scala | 16 +++++++++--
.../main/scala/kafka/network/SocketServer.scala | 2 +-
.../main/scala/kafka/network/Transmission.scala | 22 +++++++++++---
4 files changed, 33 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/48745f04/core/src/main/scala/kafka/api/FetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala
index 94650f1..e528742 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -125,7 +125,7 @@ class TopicDataSend(val topicData: TopicData) extends Send {
if(buffer.hasRemaining)
written += channel.write(buffer)
if(!buffer.hasRemaining && !sends.complete) {
- written += sends.writeCompletely(channel)
+ written += sends.writeTo(channel)
}
sent += written
written
@@ -220,7 +220,7 @@ class FetchResponseSend(val fetchResponse: FetchResponse) extends Send {
if(buffer.hasRemaining)
written += channel.write(buffer)
if(!buffer.hasRemaining && !sends.complete) {
- written += sends.writeCompletely(channel)
+ written += sends.writeTo(channel)
}
sent += written
written
http://git-wip-us.apache.org/repos/asf/kafka/blob/48745f04/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index 5845bb6..ce27a19 100644
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -71,7 +71,7 @@ class FileMessageSet private[kafka](val file: File,
}
/**
- * Search forward for the file position of the last offset that is great than or equal to the target offset
+ * Search forward for the file position of the last offset that is greater than or equal to the target offset
* and return its physical position. If no such offsets are found, return null.
*/
private[log] def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
@@ -97,8 +97,18 @@ class FileMessageSet private[kafka](val file: File,
/**
* Write some of this set to the given channel, return the amount written
*/
- def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int =
- channel.transferTo(start + writePosition, scala.math.min(size, sizeInBytes), destChannel).toInt
+ def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = {
+ // Ensure that the underlying size has not changed.
+ val newSize = scala.math.min(channel.size().toInt, limit) - start
+ if (newSize < _size.get()) {
+ throw new KafkaException("Size of FileMessageSet %s has been truncated during write: old size %d, new size %d"
+ .format(file.getAbsolutePath, _size.get(), newSize))
+ }
+ val bytesTransferred = channel.transferTo(start + writePosition, scala.math.min(size, sizeInBytes), destChannel).toInt
+ trace("FileMessageSet " + file.getAbsolutePath + " : bytes transferred : " + bytesTransferred
+ + " bytes requested for transfer : " + scala.math.min(size, sizeInBytes))
+ bytesTransferred
+ }
/**
* Get an iterator over the messages in the set. We only do shallow iteration here.
http://git-wip-us.apache.org/repos/asf/kafka/blob/48745f04/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 8f0053a..d5a24f6 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -340,7 +340,7 @@ private[kafka] class Processor(val id: Int,
if(responseSend == null)
throw new IllegalStateException("Registered for write interest but no response attached to key.")
val written = responseSend.writeTo(socketChannel)
- trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress())
+ trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress() + " using key " + key)
if(responseSend.complete) {
response.request.updateRequestMetrics()
key.attach(null)
http://git-wip-us.apache.org/repos/asf/kafka/blob/48745f04/core/src/main/scala/kafka/network/Transmission.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/Transmission.scala b/core/src/main/scala/kafka/network/Transmission.scala
index f87e9d0..2827103 100644
--- a/core/src/main/scala/kafka/network/Transmission.scala
+++ b/core/src/main/scala/kafka/network/Transmission.scala
@@ -89,13 +89,25 @@ abstract class MultiSend[S <: Send](val sends: List[S]) extends Send {
private var current = sends
var totalWritten = 0
+ /**
+ * This method continues to write to the socket buffer till an incomplete
+ * write happens. On an incomplete write, it returns to the caller to give it
+ * a chance to schedule other work till the buffered write completes.
+ */
def writeTo(channel: GatheringByteChannel): Int = {
expectIncomplete
- val written = current.head.writeTo(channel)
- totalWritten += written
- if(current.head.complete)
- current = current.tail
- written
+ var totalWrittenPerCall = 0
+ var sendComplete: Boolean = false
+ do {
+ val written = current.head.writeTo(channel)
+ totalWritten += written
+ totalWrittenPerCall += written
+ sendComplete = current.head.complete
+ if(sendComplete)
+ current = current.tail
+ } while (!complete && sendComplete)
+ trace("Bytes written as part of multisend call : " + totalWrittenPerCall + "Total bytes written so far : " + totalWritten + "Expected bytes to write : " + expectedBytesToWrite)
+ totalWrittenPerCall
}
def complete: Boolean = {