You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/08/26 22:49:39 UTC
kafka git commit: KAFKA-1683;
persisting session information in Requests
Repository: kafka
Updated Branches:
refs/heads/trunk 436b7ddc3 -> 8b538d62b
KAFKA-1683; persisting session information in Requests
Author: Gwen Shapira <cs...@gmail.com>
Reviewers: Sriharsha Chintalapa, Ismael Juma, Edward Ribeiro, Parth Brahmbhatt, Jun Rao
Closes #155 from gwenshap/KAFKA-1683
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8b538d62
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8b538d62
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8b538d62
Branch: refs/heads/trunk
Commit: 8b538d62bd7ddcefd177bfed01b10b39fc971bb1
Parents: 436b7dd
Author: Gwen Shapira <cs...@gmail.com>
Authored: Wed Aug 26 13:49:33 2015 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Wed Aug 26 13:49:33 2015 -0700
----------------------------------------------------------------------
.../common/network/PlaintextTransportLayer.java | 2 +-
.../kafka/common/network/SSLTransportLayer.java | 4 +++-
.../apache/kafka/common/network/Selector.java | 7 +++++--
.../common/security/auth/KafkaPrincipal.java | 1 +
.../scala/kafka/network/RequestChannel.scala | 16 ++++++++++------
.../main/scala/kafka/network/SocketServer.scala | 5 ++++-
core/src/main/scala/kafka/server/KafkaApis.scala | 3 ++-
.../unit/kafka/network/SocketServerTest.scala | 19 ++++++++++---------
8 files changed, 36 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b538d62/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
index a3567af..35d4168 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
@@ -37,7 +37,7 @@ public class PlaintextTransportLayer implements TransportLayer {
private static final Logger log = LoggerFactory.getLogger(PlaintextTransportLayer.class);
private final SelectionKey key;
private final SocketChannel socketChannel;
- private final Principal principal = new KafkaPrincipal("ANONYMOUS");
+ private final Principal principal = KafkaPrincipal.ANONYMOUS;
public PlaintextTransportLayer(SelectionKey key) throws IOException {
this.key = key;
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b538d62/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
index 8ba1b01..f1cd607 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java
@@ -33,6 +33,7 @@ import javax.net.ssl.SSLException;
import javax.net.ssl.SSLSession;
import javax.net.ssl.SSLPeerUnverifiedException;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -600,7 +601,8 @@ public class SSLTransportLayer implements TransportLayer {
try {
return sslEngine.getSession().getPeerPrincipal();
} catch (SSLPeerUnverifiedException se) {
- throw new IOException(String.format("Unable to retrieve getPeerPrincipal due to %s", se));
+ log.warn("SSL peer is not authenticated, returning ANONYMOUS instead");
+ return KafkaPrincipal.ANONYMOUS;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b538d62/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 12c911c..f49d54c 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -465,6 +465,7 @@ public class Selector implements Selectable {
this.sensors.connectionClosed.record();
}
+
/**
* check if channel is ready
*/
@@ -475,9 +476,11 @@ public class Selector implements Selectable {
}
/**
- * Get the channel associated with this numeric id
+ * Get the channel associated with this connection
+ * Exposing this to allow SocketServer get the Principal from the channel when creating a request
+ * without making Selector know about Principals
*/
- private KafkaChannel channelForId(String id) {
+ public KafkaChannel channelForId(String id) {
KafkaChannel channel = this.channels.get(id);
if (channel == null)
throw new IllegalStateException("Attempt to write to socket for which there is no open connection. Connection id " + id + " existing connections " + channels.keySet().toString());
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b538d62/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
index 277b6ef..b640ea0 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
@@ -20,6 +20,7 @@ package org.apache.kafka.common.security.auth;
import java.security.Principal;
public class KafkaPrincipal implements Principal {
+ public final static KafkaPrincipal ANONYMOUS = new KafkaPrincipal("ANONYMOUS");
private final String name;
public KafkaPrincipal(String name) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b538d62/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 2074128..37d9e1f 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -18,6 +18,7 @@
package kafka.network
import java.nio.ByteBuffer
+import java.security.Principal
import java.util.concurrent._
import com.yammer.metrics.core.Gauge
@@ -29,11 +30,12 @@ import kafka.utils.{Logging, SystemTime}
import org.apache.kafka.common.network.Send
import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.log4j.Logger
object RequestChannel extends Logging {
- val AllDone = new Request(processor = 1, connectionId = "2", buffer = getShutdownReceive(), startTimeMs = 0, securityProtocol = SecurityProtocol.PLAINTEXT)
+ val AllDone = new Request(processor = 1, connectionId = "2", new Session(KafkaPrincipal.ANONYMOUS, ""), buffer = getShutdownReceive(), startTimeMs = 0, securityProtocol = SecurityProtocol.PLAINTEXT)
def getShutdownReceive() = {
val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
@@ -44,7 +46,9 @@ object RequestChannel extends Logging {
byteBuffer
}
- case class Request(processor: Int, connectionId: String, private var buffer: ByteBuffer, startTimeMs: Long, securityProtocol: SecurityProtocol) {
+ case class Session(principal: Principal, host: String)
+
+ case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer, startTimeMs: Long, securityProtocol: SecurityProtocol) {
@volatile var requestDequeueTimeMs = -1L
@volatile var apiLocalCompleteTimeMs = -1L
@volatile var responseCompleteTimeMs = -1L
@@ -113,11 +117,11 @@ object RequestChannel extends Logging {
}
if(requestLogger.isTraceEnabled)
- requestLogger.trace("Completed request:%s from connection %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d"
- .format(requestDesc, connectionId, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime))
+ requestLogger.trace("Completed request:%s from connection %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s"
+ .format(requestDesc, connectionId, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime, securityProtocol, session.principal))
else if(requestLogger.isDebugEnabled)
- requestLogger.debug("Completed request:%s from connection %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d"
- .format(requestDesc, connectionId, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime))
+ requestLogger.debug("Completed request:%s from connection %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s"
+ .format(requestDesc, connectionId, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime, securityProtocol, session.principal))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b538d62/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 97b84db..af02c4e 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -416,7 +416,10 @@ private[kafka] class Processor(val id: Int,
}
collection.JavaConversions.collectionAsScalaIterable(selector.completedReceives).foreach(receive => {
try {
- val req = RequestChannel.Request(processor = id, connectionId = receive.source, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
+
+ val channel = selector.channelForId(receive.source);
+ val session = RequestChannel.Session(channel.principal, channel.socketDescription)
+ val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
requestChannel.sendRequest(req)
} catch {
case e @ (_: InvalidRequestException | _: SchemaException) => {
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b538d62/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index e727a6f..a3a8df0 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -55,7 +55,8 @@ class KafkaApis(val requestChannel: RequestChannel,
*/
def handle(request: RequestChannel.Request) {
try{
- trace("Handling request: " + request.requestObj + " from connection: " + request.connectionId)
+ trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
+ format(request.requestObj, request.connectionId, request.securityProtocol, request.session.principal))
request.requestId match {
case RequestKeys.ProduceKey => handleProducerRequest(request)
case RequestKeys.FetchKey => handleFetchRequest(request)
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b538d62/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 1937943..1585e71 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -17,21 +17,15 @@
package kafka.network;
-import java.io._
+
import java.net._
import javax.net.ssl._
import java.io._
-import java.nio.ByteBuffer
-import java.util.Random
-import kafka.api.ProducerRequest
-import kafka.cluster.EndPoint
-import kafka.common.TopicAndPartition
-import kafka.message.ByteBufferMessageSet
-import kafka.producer.SyncProducerConfig
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.NetworkSend
import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.SystemTime
import org.junit.Assert._
import org.junit._
@@ -43,7 +37,6 @@ import java.nio.ByteBuffer
import kafka.common.TopicAndPartition
import kafka.message.ByteBufferMessageSet
import kafka.server.KafkaConfig
-import java.nio.channels.SelectionKey
import kafka.utils.TestUtils
import scala.collection.Map
@@ -230,4 +223,12 @@ class SocketServerTest extends JUnitSuite {
assertEquals(serializedBytes.toSeq, receiveResponse(sslSocket).toSeq)
overrideServer.shutdown()
}
+
+ @Test
+ def testSessionPrincipal(): Unit = {
+ val socket = connect()
+ val bytes = new Array[Byte](40)
+ sendRequest(socket, 0, bytes)
+ assertEquals(KafkaPrincipal.ANONYMOUS, server.requestChannel.receiveRequest().session.principal)
+ }
}