You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/09/13 06:34:47 UTC
[kafka] branch trunk updated: KAFKA-7394;
OffsetsForLeaderEpoch supports topic describe access (#5634)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ac7cd16 KAFKA-7394; OffsetsForLeaderEpoch supports topic describe access (#5634)
ac7cd16 is described below
commit ac7cd16e936116ee92ce5f179b671203b3bacf25
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Sep 12 23:34:41 2018 -0700
KAFKA-7394; OffsetsForLeaderEpoch supports topic describe access (#5634)
As part of KIP-320, allow OffsetsForLeaderEpoch requests with Topic Describe permission.
Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
core/src/main/scala/kafka/cluster/Replica.scala | 15 +++---
core/src/main/scala/kafka/server/KafkaApis.scala | 42 ++++++++++++----
.../main/scala/kafka/server/ReplicaManager.scala | 1 -
.../kafka/api/AuthorizerIntegrationTest.scala | 56 +++++++++++++++++-----
4 files changed, 85 insertions(+), 29 deletions(-)
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 839579b..d729dad 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -194,15 +194,16 @@ class Replica(val brokerId: Int,
override def toString: String = {
val replicaString = new StringBuilder
- replicaString.append("ReplicaId: " + brokerId)
- replicaString.append("; Topic: " + topicPartition.topic)
- replicaString.append("; Partition: " + topicPartition.partition)
- replicaString.append("; isLocal: " + isLocal)
- replicaString.append("; lastCaughtUpTimeMs: " + lastCaughtUpTimeMs)
+ replicaString.append("Replica(replicaId=" + brokerId)
+ replicaString.append(s", topic=${topicPartition.topic}")
+ replicaString.append(s", partition=${topicPartition.partition}")
+ replicaString.append(s", isLocal=$isLocal")
+ replicaString.append(s", lastCaughtUpTimeMs=$lastCaughtUpTimeMs")
if (isLocal) {
- replicaString.append("; Highwatermark: " + highWatermark)
- replicaString.append("; LastStableOffset: " + lastStableOffset)
+ replicaString.append(s", highWatermark=$highWatermark")
+ replicaString.append(s", lastStableOffset=$lastStableOffset")
}
+ replicaString.append(")")
replicaString.toString
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5a82cc4..afbe5b8 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -181,7 +181,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
+ if (isAuthorizedClusterAction(request)) {
val response = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, onLeadershipChange)
sendResponseExemptThrottle(request, response)
} else {
@@ -196,7 +196,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// stop serving data to clients for the topic being deleted
val stopReplicaRequest = request.body[StopReplicaRequest]
- if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
+ if (isAuthorizedClusterAction(request)) {
val (result, error) = replicaManager.stopReplicas(stopReplicaRequest)
// Clearing out the cache for groups that belong to an offsets topic partition for which this broker was the leader,
// since this broker is no longer a replica for that offsets topic partition.
@@ -223,7 +223,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val correlationId = request.header.correlationId
val updateMetadataRequest = request.body[UpdateMetadataRequest]
- if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
+ if (isAuthorizedClusterAction(request)) {
val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest)
if (deletedPartitions.nonEmpty)
groupCoordinator.handleDeletedPartitions(deletedPartitions)
@@ -2038,12 +2038,27 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit = {
val offsetForLeaderEpoch = request.body[OffsetsForLeaderEpochRequest]
- val requestInfo = offsetForLeaderEpoch.epochsByTopicPartition()
- authorizeClusterAction(request)
+ val requestInfo = offsetForLeaderEpoch.epochsByTopicPartition.asScala
+
+ // The OffsetsForLeaderEpoch API was initially only used for inter-broker communication and required
+ // cluster permission. With KIP-320, the consumer now also uses this API to check for log truncation
+ // following a leader change, so we also allow topic describe permission.
+ val (authorizedPartitions, unauthorizedPartitions) = if (isAuthorizedClusterAction(request)) {
+ (requestInfo, Map.empty[TopicPartition, OffsetsForLeaderEpochRequest.PartitionData])
+ } else {
+ requestInfo.partition {
+ case (tp, _) => authorize(request.session, Describe, Resource(Topic, tp.topic, LITERAL))
+ }
+ }
- val lastOffsetForLeaderEpoch = replicaManager.lastOffsetForLeaderEpoch(requestInfo.asScala).asJava
+ val endOffsetsForAuthorizedPartitions = replicaManager.lastOffsetForLeaderEpoch(authorizedPartitions)
+ val endOffsetsForUnauthorizedPartitions = unauthorizedPartitions.mapValues(_ =>
+ new EpochEndOffset(Errors.TOPIC_AUTHORIZATION_FAILED, EpochEndOffset.UNDEFINED_EPOCH,
+ EpochEndOffset.UNDEFINED_EPOCH_OFFSET))
+
+ val endOffsetsForAllPartitions = endOffsetsForAuthorizedPartitions ++ endOffsetsForUnauthorizedPartitions
sendResponseMaybeThrottle(request, requestThrottleMs =>
- new OffsetsForLeaderEpochResponse(requestThrottleMs, lastOffsetForLeaderEpoch))
+ new OffsetsForLeaderEpochResponse(requestThrottleMs, endOffsetsForAllPartitions.asJava))
}
def handleAlterConfigsRequest(request: RequestChannel.Request): Unit = {
@@ -2247,10 +2262,14 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def authorizeClusterAction(request: RequestChannel.Request): Unit = {
- if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
+ if (!isAuthorizedClusterAction(request))
throw new ClusterAuthorizationException(s"Request $request is not authorized.")
}
+ private def isAuthorizedClusterAction(request: RequestChannel.Request): Boolean = {
+ authorize(request.session, ClusterAction, Resource.ClusterResource)
+ }
+
def authorizeClusterAlter(request: RequestChannel.Request): Unit = {
if (!authorize(request.session, Alter, Resource.ClusterResource))
throw new ClusterAuthorizationException(s"Request $request is not authorized.")
@@ -2283,7 +2302,11 @@ class KafkaApis(val requestChannel: RequestChannel,
private def handleError(request: RequestChannel.Request, e: Throwable) {
val mayThrottle = e.isInstanceOf[ClusterAuthorizationException] || !request.header.apiKey.clusterAction
- error("Error when handling request %s".format(request.body[AbstractRequest]), e)
+ error("Error when handling request: " +
+ s"clientId=${request.header.clientId}, " +
+ s"correlationId=${request.header.correlationId}, " +
+ s"api=${request.header.apiKey}, " +
+ s"body=${request.body[AbstractRequest]}", e)
if (mayThrottle)
sendErrorResponseMaybeThrottle(request, e)
else
@@ -2361,4 +2384,5 @@ class KafkaApis(val requestChannel: RequestChannel,
private def sendResponse(response: RequestChannel.Response): Unit = {
requestChannel.sendResponse(response)
}
+
}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index d61240e..114e69c 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1494,4 +1494,3 @@ class ReplicaManager(val config: KafkaConfig,
}
}
}
-
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index cfc6b5e..dbe2a02 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -211,7 +211,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.CREATE_TOPICS -> topicCreateAcl,
ApiKeys.DELETE_TOPICS -> topicDeleteAcl,
ApiKeys.DELETE_RECORDS -> topicDeleteAcl,
- ApiKeys.OFFSET_FOR_LEADER_EPOCH -> clusterAcl,
+ ApiKeys.OFFSET_FOR_LEADER_EPOCH -> topicDescribeAcl,
ApiKeys.DESCRIBE_CONFIGS -> topicDescribeConfigsAcl,
ApiKeys.ALTER_CONFIGS -> topicAlterConfigsAcl,
ApiKeys.INIT_PRODUCER_ID -> (transactionIdWriteAcl ++ clusterIdempotentWriteAcl),
@@ -277,7 +277,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
build()
}
- private def offsetsForLeaderEpochRequest = {
+ private def offsetsForLeaderEpochRequest: OffsetsForLeaderEpochRequest = {
new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion)
.add(tp, Optional.of(27), 7).build()
}
@@ -382,7 +382,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
private def addOffsetsToTxnRequest = new AddOffsetsToTxnRequest.Builder(transactionalId, 1, 1, group).build()
-
@Test
def testAuthorizationWithTopicExisting() {
val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
@@ -400,7 +399,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.HEARTBEAT -> heartbeatRequest,
ApiKeys.LEAVE_GROUP -> leaveGroupRequest,
ApiKeys.LEADER_AND_ISR -> leaderAndIsrRequest,
- ApiKeys.STOP_REPLICA -> stopReplicaRequest,
ApiKeys.CONTROLLED_SHUTDOWN -> controlledShutdownRequest,
ApiKeys.CREATE_TOPICS -> createTopicsRequest,
ApiKeys.DELETE_TOPICS -> deleteTopicsRequest,
@@ -415,7 +413,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.DESCRIBE_LOG_DIRS -> describeLogDirsRequest,
ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest,
ApiKeys.ADD_PARTITIONS_TO_TXN -> addPartitionsToTxnRequest,
- ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest
+ ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest,
+
+ // Check StopReplica last since some APIs depend on replica availability
+ ApiKeys.STOP_REPLICA -> stopReplicaRequest
)
for ((key, request) <- requestKeyToRequest) {
@@ -426,7 +427,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val resourceToAcls = requestKeysToAcls(key)
resourceToAcls.get(topicResource).foreach { acls =>
val describeAcls = topicDescribeAcl(topicResource)
- val isAuthorized = describeAcls == acls
+ val isAuthorized = describeAcls == acls
addAndVerifyAcls(describeAcls, topicResource)
sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized)
removeAllAcls()
@@ -460,7 +461,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.ADD_PARTITIONS_TO_TXN -> addPartitionsToTxnRequest,
ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest,
ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest,
- ApiKeys.DELETE_GROUPS -> deleteGroupsRequest
+ ApiKeys.DELETE_GROUPS -> deleteGroupsRequest,
+ ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest
)
for ((key, request) <- requestKeyToRequest) {
@@ -504,11 +506,28 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val resources = Set(topicResource.resourceType, Resource.ClusterResource.resourceType)
sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false)
- val readAcls = topicReadAcl.get(topicResource).get
+ val readAcls = topicReadAcl(topicResource)
addAndVerifyAcls(readAcls, topicResource)
sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false)
- val clusterAcls = clusterAcl.get(Resource.ClusterResource).get
+ val clusterAcls = clusterAcl(Resource.ClusterResource)
+ addAndVerifyAcls(clusterAcls, Resource.ClusterResource)
+ sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true)
+ }
+
+ @Test
+ def testOffsetsForLeaderEpochClusterPermission(): Unit = {
+ val key = ApiKeys.OFFSET_FOR_LEADER_EPOCH
+ val request = offsetsForLeaderEpochRequest
+
+ removeAllAcls()
+
+ val resources = Set(topicResource.resourceType, Resource.ClusterResource.resourceType)
+ sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false)
+
+ // Although the OffsetsForLeaderEpoch API now accepts topic describe, we should continue
+ // allowing cluster action for backwards compatibility
+ val clusterAcls = clusterAcl(Resource.ClusterResource)
addAndVerifyAcls(clusterAcls, Resource.ClusterResource)
sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true)
}
@@ -1010,18 +1029,31 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
@Test(expected = classOf[TopicAuthorizationException])
- def testListOffsetsWithNoTopicAccess() {
+ def testMetadataWithNoTopicAccess() {
val consumer = createConsumer()
consumer.partitionsFor(topic)
}
@Test
- def testListOffsetsWithTopicDescribe() {
+ def testMetadataWithTopicDescribe() {
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
val consumer = createConsumer()
consumer.partitionsFor(topic)
}
+ @Test(expected = classOf[TopicAuthorizationException])
+ def testListOffsetsWithNoTopicAccess() {
+ val consumer = createConsumer()
+ consumer.endOffsets(Set(tp).asJava)
+ }
+
+ @Test
+ def testListOffsetsWithTopicDescribe() {
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
+ val consumer = createConsumer()
+ consumer.endOffsets(Set(tp).asJava)
+ }
+
@Test
def testDescribeGroupApiWithNoGroupAcl() {
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), topicResource)
@@ -1434,7 +1466,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val resp = connectAndSend(request, apiKey)
val response = requestKeyToResponseDeserializer(apiKey).getMethod("parse", classOf[ByteBuffer], classOf[Short]).invoke(
null, resp, request.version: java.lang.Short).asInstanceOf[AbstractResponse]
- val error = requestKeyToError(apiKey).asInstanceOf[(AbstractResponse) => Errors](response)
+ val error = requestKeyToError(apiKey).asInstanceOf[AbstractResponse => Errors](response)
val authorizationErrors = resources.flatMap { resourceType =>
if (resourceType == Topic) {