You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/08/26 22:49:39 UTC

kafka git commit: KAFKA-1683; persisting session information in Requests

Repository: kafka
Updated Branches:
  refs/heads/trunk 436b7ddc3 -> 8b538d62b


KAFKA-1683; persisting session information in Requests

Author: Gwen Shapira <cs...@gmail.com>

Reviewers: Sriharsha Chintalapa, Ismael Juma, Edward Ribeiro, Parth Brahmbhatt, Jun Rao

Closes #155 from gwenshap/KAFKA-1683


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8b538d62
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8b538d62
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8b538d62

Branch: refs/heads/trunk
Commit: 8b538d62bd7ddcefd177bfed01b10b39fc971bb1
Parents: 436b7dd
Author: Gwen Shapira <cs...@gmail.com>
Authored: Wed Aug 26 13:49:33 2015 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Wed Aug 26 13:49:33 2015 -0700

----------------------------------------------------------------------
 .../common/network/PlaintextTransportLayer.java  |  2 +-
 .../kafka/common/network/SSLTransportLayer.java  |  4 +++-
 .../apache/kafka/common/network/Selector.java    |  7 +++++--
 .../common/security/auth/KafkaPrincipal.java     |  1 +
 .../scala/kafka/network/RequestChannel.scala     | 16 ++++++++++------
 .../main/scala/kafka/network/SocketServer.scala  |  5 ++++-
 core/src/main/scala/kafka/server/KafkaApis.scala |  3 ++-
 .../unit/kafka/network/SocketServerTest.scala    | 19 ++++++++++---------
 8 files changed, 36 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8b538d62/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
index a3567af..35d4168 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
@@ -37,7 +37,7 @@ public class PlaintextTransportLayer implements TransportLayer {
     private static final Logger log = LoggerFactory.getLogger(PlaintextTransportLayer.class);
     private final SelectionKey key;
     private final SocketChannel socketChannel;
-    private final Principal principal = new KafkaPrincipal("ANONYMOUS");
+    private final Principal principal = KafkaPrincipal.ANONYMOUS;
 
     public PlaintextTransportLayer(SelectionKey key) throws IOException {
         this.key = key;

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b538d62/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
index 8ba1b01..f1cd607 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
@@ -33,6 +33,7 @@ import javax.net.ssl.SSLException;
 import javax.net.ssl.SSLSession;
 import javax.net.ssl.SSLPeerUnverifiedException;
 
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -600,7 +601,8 @@ public class SSLTransportLayer implements TransportLayer {
         try {
             return sslEngine.getSession().getPeerPrincipal();
         } catch (SSLPeerUnverifiedException se) {
-            throw new IOException(String.format("Unable to retrieve getPeerPrincipal due to %s", se));
+            log.warn("SSL peer is not authenticated, returning ANONYMOUS instead");
+            return KafkaPrincipal.ANONYMOUS;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b538d62/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 12c911c..f49d54c 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -465,6 +465,7 @@ public class Selector implements Selectable {
         this.sensors.connectionClosed.record();
     }
 
+
     /**
      * check if channel is ready
      */
@@ -475,9 +476,11 @@ public class Selector implements Selectable {
     }
 
     /**
-     * Get the channel associated with this numeric id
+     * Get the channel associated with this connection
+     * Exposing this to allow SocketServer get the Principal from the channel when creating a request
+     * without making Selector know about Principals
      */
-    private KafkaChannel channelForId(String id) {
+    public KafkaChannel channelForId(String id) {
         KafkaChannel channel = this.channels.get(id);
         if (channel == null)
             throw new IllegalStateException("Attempt to write to socket for which there is no open connection. Connection id " + id + " existing connections " + channels.keySet().toString());

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b538d62/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
index 277b6ef..b640ea0 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
@@ -20,6 +20,7 @@ package org.apache.kafka.common.security.auth;
 import java.security.Principal;
 
 public class KafkaPrincipal implements Principal {
+    public final static KafkaPrincipal ANONYMOUS = new KafkaPrincipal("ANONYMOUS");
     private final String name;
 
     public KafkaPrincipal(String name) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b538d62/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 2074128..37d9e1f 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -18,6 +18,7 @@
 package kafka.network
 
 import java.nio.ByteBuffer
+import java.security.Principal
 import java.util.concurrent._
 
 import com.yammer.metrics.core.Gauge
@@ -29,11 +30,12 @@ import kafka.utils.{Logging, SystemTime}
 import org.apache.kafka.common.network.Send
 import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
 import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.log4j.Logger
 
 
 object RequestChannel extends Logging {
-  val AllDone = new Request(processor = 1, connectionId = "2", buffer = getShutdownReceive(), startTimeMs = 0, securityProtocol = SecurityProtocol.PLAINTEXT)
+  val AllDone = new Request(processor = 1, connectionId = "2", new Session(KafkaPrincipal.ANONYMOUS, ""), buffer = getShutdownReceive(), startTimeMs = 0, securityProtocol = SecurityProtocol.PLAINTEXT)
 
   def getShutdownReceive() = {
     val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
@@ -44,7 +46,9 @@ object RequestChannel extends Logging {
     byteBuffer
   }
 
-  case class Request(processor: Int, connectionId: String, private var buffer: ByteBuffer, startTimeMs: Long, securityProtocol: SecurityProtocol) {
+  case class Session(principal: Principal, host: String)
+
+  case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer, startTimeMs: Long, securityProtocol: SecurityProtocol) {
     @volatile var requestDequeueTimeMs = -1L
     @volatile var apiLocalCompleteTimeMs = -1L
     @volatile var responseCompleteTimeMs = -1L
@@ -113,11 +117,11 @@ object RequestChannel extends Logging {
       }
 
       if(requestLogger.isTraceEnabled)
-        requestLogger.trace("Completed request:%s from connection %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d"
-                .format(requestDesc, connectionId, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime))
+        requestLogger.trace("Completed request:%s from connection %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s"
+                .format(requestDesc, connectionId, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime, securityProtocol, session.principal))
       else if(requestLogger.isDebugEnabled)
-        requestLogger.debug("Completed request:%s from connection %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d"
-          .format(requestDesc, connectionId, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime))
+        requestLogger.debug("Completed request:%s from connection %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s"
+          .format(requestDesc, connectionId, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime, securityProtocol, session.principal))
     }
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b538d62/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 97b84db..af02c4e 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -416,7 +416,10 @@ private[kafka] class Processor(val id: Int,
         }
         collection.JavaConversions.collectionAsScalaIterable(selector.completedReceives).foreach(receive => {
           try {
-            val req = RequestChannel.Request(processor = id, connectionId = receive.source, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
+
+            val channel = selector.channelForId(receive.source);
+            val session = RequestChannel.Session(channel.principal, channel.socketDescription)
+            val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
             requestChannel.sendRequest(req)
           } catch {
             case e @ (_: InvalidRequestException | _: SchemaException) => {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b538d62/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index e727a6f..a3a8df0 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -55,7 +55,8 @@ class KafkaApis(val requestChannel: RequestChannel,
    */
   def handle(request: RequestChannel.Request) {
     try{
-      trace("Handling request: " + request.requestObj + " from connection: " + request.connectionId)
+      trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
+        format(request.requestObj, request.connectionId, request.securityProtocol, request.session.principal))
       request.requestId match {
         case RequestKeys.ProduceKey => handleProducerRequest(request)
         case RequestKeys.FetchKey => handleFetchRequest(request)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b538d62/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 1937943..1585e71 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -17,21 +17,15 @@
 
 package kafka.network;
 
-import java.io._
+
 import java.net._
 import javax.net.ssl._
 import java.io._
-import java.nio.ByteBuffer
-import java.util.Random
 
-import kafka.api.ProducerRequest
-import kafka.cluster.EndPoint
-import kafka.common.TopicAndPartition
-import kafka.message.ByteBufferMessageSet
-import kafka.producer.SyncProducerConfig
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.NetworkSend
 import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.SystemTime
 import org.junit.Assert._
 import org.junit._
@@ -43,7 +37,6 @@ import java.nio.ByteBuffer
 import kafka.common.TopicAndPartition
 import kafka.message.ByteBufferMessageSet
 import kafka.server.KafkaConfig
-import java.nio.channels.SelectionKey
 import kafka.utils.TestUtils
 
 import scala.collection.Map
@@ -230,4 +223,12 @@ class SocketServerTest extends JUnitSuite {
     assertEquals(serializedBytes.toSeq, receiveResponse(sslSocket).toSeq)
     overrideServer.shutdown()
   }
+
+  @Test
+  def testSessionPrincipal(): Unit = {
+    val socket = connect()
+    val bytes = new Array[Byte](40)
+    sendRequest(socket, 0, bytes)
+    assertEquals(KafkaPrincipal.ANONYMOUS, server.requestChannel.receiveRequest().session.principal)
+  }
 }