You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/26 07:04:36 UTC

[27/28] git commit: KAFKA-700 log client ip when we log each request on the broker; reviewed by Neha Narkhede

KAFKA-700 log client ip when we log each request on the broker; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/40a80fa7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/40a80fa7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/40a80fa7

Branch: refs/heads/trunk
Commit: 40a80fa7b7ae3d49e32c40fbaad1a4e402b2ac71
Parents: 22a010b
Author: Sriram Subramanian <sr...@gmail.com>
Authored: Fri Jan 25 13:06:13 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Jan 25 13:06:24 2013 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/network/RequestChannel.scala  |    3 ++-
 .../main/scala/kafka/network/SocketServer.scala    |    5 +++--
 core/src/main/scala/kafka/server/KafkaApis.scala   |    2 +-
 3 files changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/40a80fa7/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 5185dec..9b0f7e9 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -25,6 +25,7 @@ import kafka.api._
 import kafka.common.TopicAndPartition
 import kafka.utils.{Logging, SystemTime}
 import kafka.message.ByteBufferMessageSet
+import java.net._
 
 
 object RequestChannel extends Logging {
@@ -39,7 +40,7 @@ object RequestChannel extends Logging {
     byteBuffer
   }
 
-  case class Request(processor: Int, requestKey: Any, buffer: ByteBuffer, startTimeMs: Long) {
+  case class Request(processor: Int, requestKey: Any, buffer: ByteBuffer, startTimeMs: Long, remoteAddress: SocketAddress = new InetSocketAddress(0)) {
     @volatile var dequeueTimeMs = -1L
     @volatile var apiLocalCompleteTimeMs = -1L
     @volatile var responseCompleteTimeMs = -1L

http://git-wip-us.apache.org/repos/asf/kafka/blob/40a80fa7/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index e5dccd3..b056e25 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -313,11 +313,12 @@ private[kafka] class Processor(val id: Int,
       key.attach(receive)
     }
     val read = receive.readFrom(socketChannel)
-    trace(read + " bytes read from " + socketChannel.socket.getRemoteSocketAddress())
+    val address = socketChannel.socket.getRemoteSocketAddress();
+    trace(read + " bytes read from " + address)
     if(read < 0) {
       close(key)
     } else if(receive.complete) {
-      val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds)
+      val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address)
       requestChannel.sendRequest(req)
       trace("Received request, sending for processing by handler: " + req)
       key.attach(null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/40a80fa7/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0a1a11a..6df077b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -55,7 +55,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handle(request: RequestChannel.Request) {
     try{
       if(requestLogger.isTraceEnabled)
-        requestLogger.trace("Handling request: %s".format(request.requestObj))
+        requestLogger.trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress)
       request.requestId match {
         case RequestKeys.ProduceKey => handleProducerRequest(request)
         case RequestKeys.FetchKey => handleFetchRequest(request)