You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/09/13 06:34:47 UTC

[kafka] branch trunk updated: KAFKA-7394; OffsetsForLeaderEpoch supports topic describe access (#5634)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ac7cd16  KAFKA-7394; OffsetsForLeaderEpoch supports topic describe access (#5634)
ac7cd16 is described below

commit ac7cd16e936116ee92ce5f179b671203b3bacf25
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Sep 12 23:34:41 2018 -0700

    KAFKA-7394; OffsetsForLeaderEpoch supports topic describe access (#5634)
    
    As part of KIP-320, allow OffsetsForLeaderEpoch requests with Topic Describe permission.
    
    Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
 core/src/main/scala/kafka/cluster/Replica.scala    | 15 +++---
 core/src/main/scala/kafka/server/KafkaApis.scala   | 42 ++++++++++++----
 .../main/scala/kafka/server/ReplicaManager.scala   |  1 -
 .../kafka/api/AuthorizerIntegrationTest.scala      | 56 +++++++++++++++++-----
 4 files changed, 85 insertions(+), 29 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 839579b..d729dad 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -194,15 +194,16 @@ class Replica(val brokerId: Int,
 
   override def toString: String = {
     val replicaString = new StringBuilder
-    replicaString.append("ReplicaId: " + brokerId)
-    replicaString.append("; Topic: " + topicPartition.topic)
-    replicaString.append("; Partition: " + topicPartition.partition)
-    replicaString.append("; isLocal: " + isLocal)
-    replicaString.append("; lastCaughtUpTimeMs: " + lastCaughtUpTimeMs)
+    replicaString.append("Replica(replicaId=" + brokerId)
+    replicaString.append(s", topic=${topicPartition.topic}")
+    replicaString.append(s", partition=${topicPartition.partition}")
+    replicaString.append(s", isLocal=$isLocal")
+    replicaString.append(s", lastCaughtUpTimeMs=$lastCaughtUpTimeMs")
     if (isLocal) {
-      replicaString.append("; Highwatermark: " + highWatermark)
-      replicaString.append("; LastStableOffset: " + lastStableOffset)
+      replicaString.append(s", highWatermark=$highWatermark")
+      replicaString.append(s", lastStableOffset=$lastStableOffset")
     }
+    replicaString.append(")")
     replicaString.toString
   }
 }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5a82cc4..afbe5b8 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -181,7 +181,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
+    if (isAuthorizedClusterAction(request)) {
       val response = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, onLeadershipChange)
       sendResponseExemptThrottle(request, response)
     } else {
@@ -196,7 +196,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     // stop serving data to clients for the topic being deleted
     val stopReplicaRequest = request.body[StopReplicaRequest]
 
-    if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
+    if (isAuthorizedClusterAction(request)) {
       val (result, error) = replicaManager.stopReplicas(stopReplicaRequest)
       // Clearing out the cache for groups that belong to an offsets topic partition for which this broker was the leader,
       // since this broker is no longer a replica for that offsets topic partition.
@@ -223,7 +223,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val correlationId = request.header.correlationId
     val updateMetadataRequest = request.body[UpdateMetadataRequest]
 
-    if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
+    if (isAuthorizedClusterAction(request)) {
       val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest)
       if (deletedPartitions.nonEmpty)
         groupCoordinator.handleDeletedPartitions(deletedPartitions)
@@ -2038,12 +2038,27 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit = {
     val offsetForLeaderEpoch = request.body[OffsetsForLeaderEpochRequest]
-    val requestInfo = offsetForLeaderEpoch.epochsByTopicPartition()
-    authorizeClusterAction(request)
+    val requestInfo = offsetForLeaderEpoch.epochsByTopicPartition.asScala
+
+    // The OffsetsForLeaderEpoch API was initially only used for inter-broker communication and required
+    // cluster permission. With KIP-320, the consumer now also uses this API to check for log truncation
+    // following a leader change, so we also allow topic describe permission.
+    val (authorizedPartitions, unauthorizedPartitions) = if (isAuthorizedClusterAction(request)) {
+      (requestInfo, Map.empty[TopicPartition, OffsetsForLeaderEpochRequest.PartitionData])
+    } else {
+      requestInfo.partition {
+        case (tp, _) => authorize(request.session, Describe, Resource(Topic, tp.topic, LITERAL))
+      }
+    }
 
-    val lastOffsetForLeaderEpoch = replicaManager.lastOffsetForLeaderEpoch(requestInfo.asScala).asJava
+    val endOffsetsForAuthorizedPartitions = replicaManager.lastOffsetForLeaderEpoch(authorizedPartitions)
+    val endOffsetsForUnauthorizedPartitions = unauthorizedPartitions.mapValues(_ =>
+      new EpochEndOffset(Errors.TOPIC_AUTHORIZATION_FAILED, EpochEndOffset.UNDEFINED_EPOCH,
+        EpochEndOffset.UNDEFINED_EPOCH_OFFSET))
+
+    val endOffsetsForAllPartitions = endOffsetsForAuthorizedPartitions ++ endOffsetsForUnauthorizedPartitions
     sendResponseMaybeThrottle(request, requestThrottleMs =>
-      new OffsetsForLeaderEpochResponse(requestThrottleMs, lastOffsetForLeaderEpoch))
+      new OffsetsForLeaderEpochResponse(requestThrottleMs, endOffsetsForAllPartitions.asJava))
   }
 
   def handleAlterConfigsRequest(request: RequestChannel.Request): Unit = {
@@ -2247,10 +2262,14 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def authorizeClusterAction(request: RequestChannel.Request): Unit = {
-    if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
+    if (!isAuthorizedClusterAction(request))
       throw new ClusterAuthorizationException(s"Request $request is not authorized.")
   }
 
+  private def isAuthorizedClusterAction(request: RequestChannel.Request): Boolean = {
+    authorize(request.session, ClusterAction, Resource.ClusterResource)
+  }
+
   def authorizeClusterAlter(request: RequestChannel.Request): Unit = {
     if (!authorize(request.session, Alter, Resource.ClusterResource))
       throw new ClusterAuthorizationException(s"Request $request is not authorized.")
@@ -2283,7 +2302,11 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   private def handleError(request: RequestChannel.Request, e: Throwable) {
     val mayThrottle = e.isInstanceOf[ClusterAuthorizationException] || !request.header.apiKey.clusterAction
-    error("Error when handling request %s".format(request.body[AbstractRequest]), e)
+    error("Error when handling request: " +
+      s"clientId=${request.header.clientId}, " +
+      s"correlationId=${request.header.correlationId}, " +
+      s"api=${request.header.apiKey}, " +
+      s"body=${request.body[AbstractRequest]}", e)
     if (mayThrottle)
       sendErrorResponseMaybeThrottle(request, e)
     else
@@ -2361,4 +2384,5 @@ class KafkaApis(val requestChannel: RequestChannel,
   private def sendResponse(response: RequestChannel.Response): Unit = {
     requestChannel.sendResponse(response)
   }
+
 }
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index d61240e..114e69c 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1494,4 +1494,3 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 }
-
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index cfc6b5e..dbe2a02 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -211,7 +211,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.CREATE_TOPICS -> topicCreateAcl,
     ApiKeys.DELETE_TOPICS -> topicDeleteAcl,
     ApiKeys.DELETE_RECORDS -> topicDeleteAcl,
-    ApiKeys.OFFSET_FOR_LEADER_EPOCH -> clusterAcl,
+    ApiKeys.OFFSET_FOR_LEADER_EPOCH -> topicDescribeAcl,
     ApiKeys.DESCRIBE_CONFIGS -> topicDescribeConfigsAcl,
     ApiKeys.ALTER_CONFIGS -> topicAlterConfigsAcl,
     ApiKeys.INIT_PRODUCER_ID -> (transactionIdWriteAcl ++ clusterIdempotentWriteAcl),
@@ -277,7 +277,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       build()
   }
 
-  private def offsetsForLeaderEpochRequest = {
+  private def offsetsForLeaderEpochRequest: OffsetsForLeaderEpochRequest = {
     new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion)
       .add(tp, Optional.of(27), 7).build()
   }
@@ -382,7 +382,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def addOffsetsToTxnRequest = new AddOffsetsToTxnRequest.Builder(transactionalId, 1, 1, group).build()
 
-
   @Test
   def testAuthorizationWithTopicExisting() {
     val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
@@ -400,7 +399,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.HEARTBEAT -> heartbeatRequest,
       ApiKeys.LEAVE_GROUP -> leaveGroupRequest,
       ApiKeys.LEADER_AND_ISR -> leaderAndIsrRequest,
-      ApiKeys.STOP_REPLICA -> stopReplicaRequest,
       ApiKeys.CONTROLLED_SHUTDOWN -> controlledShutdownRequest,
       ApiKeys.CREATE_TOPICS -> createTopicsRequest,
       ApiKeys.DELETE_TOPICS -> deleteTopicsRequest,
@@ -415,7 +413,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.DESCRIBE_LOG_DIRS -> describeLogDirsRequest,
       ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest,
       ApiKeys.ADD_PARTITIONS_TO_TXN -> addPartitionsToTxnRequest,
-      ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest
+      ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest,
+
+      // Check StopReplica last since some APIs depend on replica availability
+      ApiKeys.STOP_REPLICA -> stopReplicaRequest
     )
 
     for ((key, request) <- requestKeyToRequest) {
@@ -426,7 +427,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       val resourceToAcls = requestKeysToAcls(key)
       resourceToAcls.get(topicResource).foreach { acls =>
         val describeAcls = topicDescribeAcl(topicResource)
-        val isAuthorized =  describeAcls == acls
+        val isAuthorized = describeAcls == acls
         addAndVerifyAcls(describeAcls, topicResource)
         sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized)
         removeAllAcls()
@@ -460,7 +461,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.ADD_PARTITIONS_TO_TXN -> addPartitionsToTxnRequest,
       ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest,
       ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest,
-      ApiKeys.DELETE_GROUPS -> deleteGroupsRequest
+      ApiKeys.DELETE_GROUPS -> deleteGroupsRequest,
+      ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest
     )
 
     for ((key, request) <- requestKeyToRequest) {
@@ -504,11 +506,28 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val resources = Set(topicResource.resourceType, Resource.ClusterResource.resourceType)
     sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false)
 
-    val readAcls = topicReadAcl.get(topicResource).get
+    val readAcls = topicReadAcl(topicResource)
     addAndVerifyAcls(readAcls, topicResource)
     sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false)
 
-    val clusterAcls = clusterAcl.get(Resource.ClusterResource).get
+    val clusterAcls = clusterAcl(Resource.ClusterResource)
+    addAndVerifyAcls(clusterAcls, Resource.ClusterResource)
+    sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true)
+  }
+
+  @Test
+  def testOffsetsForLeaderEpochClusterPermission(): Unit = {
+    val key = ApiKeys.OFFSET_FOR_LEADER_EPOCH
+    val request = offsetsForLeaderEpochRequest
+
+    removeAllAcls()
+
+    val resources = Set(topicResource.resourceType, Resource.ClusterResource.resourceType)
+    sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false)
+
+    // Although the OffsetsForLeaderEpoch API now accepts topic describe, we should continue
+    // allowing cluster action for backwards compatibility
+    val clusterAcls = clusterAcl(Resource.ClusterResource)
     addAndVerifyAcls(clusterAcls, Resource.ClusterResource)
     sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true)
   }
@@ -1010,18 +1029,31 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   @Test(expected = classOf[TopicAuthorizationException])
-  def testListOffsetsWithNoTopicAccess() {
+  def testMetadataWithNoTopicAccess() {
     val consumer = createConsumer()
     consumer.partitionsFor(topic)
   }
 
   @Test
-  def testListOffsetsWithTopicDescribe() {
+  def testMetadataWithTopicDescribe() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
     val consumer = createConsumer()
     consumer.partitionsFor(topic)
   }
 
+  @Test(expected = classOf[TopicAuthorizationException])
+  def testListOffsetsWithNoTopicAccess() {
+    val consumer = createConsumer()
+    consumer.endOffsets(Set(tp).asJava)
+  }
+
+  @Test
+  def testListOffsetsWithTopicDescribe() {
+    addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
+    val consumer = createConsumer()
+    consumer.endOffsets(Set(tp).asJava)
+  }
+
   @Test
   def testDescribeGroupApiWithNoGroupAcl() {
     addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
@@ -1434,7 +1466,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val resp = connectAndSend(request, apiKey)
     val response = requestKeyToResponseDeserializer(apiKey).getMethod("parse", classOf[ByteBuffer], classOf[Short]).invoke(
       null, resp, request.version: java.lang.Short).asInstanceOf[AbstractResponse]
-    val error = requestKeyToError(apiKey).asInstanceOf[(AbstractResponse) => Errors](response)
+    val error = requestKeyToError(apiKey).asInstanceOf[AbstractResponse => Errors](response)
 
     val authorizationErrors = resources.flatMap { resourceType =>
       if (resourceType == Topic) {