You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/26 07:04:36 UTC
[27/28] git commit: KAFKA-700 log client ip when we log each request
on the broker; reviewed by Neha Narkhede
KAFKA-700 log client ip when we log each request on the broker; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/40a80fa7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/40a80fa7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/40a80fa7
Branch: refs/heads/trunk
Commit: 40a80fa7b7ae3d49e32c40fbaad1a4e402b2ac71
Parents: 22a010b
Author: Sriram Subramanian <sr...@gmail.com>
Authored: Fri Jan 25 13:06:13 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Jan 25 13:06:24 2013 -0800
----------------------------------------------------------------------
.../main/scala/kafka/network/RequestChannel.scala | 3 ++-
.../main/scala/kafka/network/SocketServer.scala | 5 +++--
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +-
3 files changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/40a80fa7/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 5185dec..9b0f7e9 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -25,6 +25,7 @@ import kafka.api._
import kafka.common.TopicAndPartition
import kafka.utils.{Logging, SystemTime}
import kafka.message.ByteBufferMessageSet
+import java.net._
object RequestChannel extends Logging {
@@ -39,7 +40,7 @@ object RequestChannel extends Logging {
byteBuffer
}
- case class Request(processor: Int, requestKey: Any, buffer: ByteBuffer, startTimeMs: Long) {
+ case class Request(processor: Int, requestKey: Any, buffer: ByteBuffer, startTimeMs: Long, remoteAddress: SocketAddress = new InetSocketAddress(0)) {
@volatile var dequeueTimeMs = -1L
@volatile var apiLocalCompleteTimeMs = -1L
@volatile var responseCompleteTimeMs = -1L
http://git-wip-us.apache.org/repos/asf/kafka/blob/40a80fa7/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index e5dccd3..b056e25 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -313,11 +313,12 @@ private[kafka] class Processor(val id: Int,
key.attach(receive)
}
val read = receive.readFrom(socketChannel)
- trace(read + " bytes read from " + socketChannel.socket.getRemoteSocketAddress())
+ val address = socketChannel.socket.getRemoteSocketAddress();
+ trace(read + " bytes read from " + address)
if(read < 0) {
close(key)
} else if(receive.complete) {
- val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds)
+ val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address)
requestChannel.sendRequest(req)
trace("Received request, sending for processing by handler: " + req)
key.attach(null)
http://git-wip-us.apache.org/repos/asf/kafka/blob/40a80fa7/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0a1a11a..6df077b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -55,7 +55,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def handle(request: RequestChannel.Request) {
try{
if(requestLogger.isTraceEnabled)
- requestLogger.trace("Handling request: %s".format(request.requestObj))
+ requestLogger.trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress)
request.requestId match {
case RequestKeys.ProduceKey => handleProducerRequest(request)
case RequestKeys.FetchKey => handleFetchRequest(request)