You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jo...@apache.org on 2023/04/14 16:35:53 UTC
[kafka] branch trunk updated: KAFKA-14790: Add more AddPartitionsToTxn tests in KafkaApis and Authorizer tests (#13502)
This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cbf360b37dc KAFKA-14790: Add more AddPartitionsToTxn tests in KafkaApis and Authorizer tests (#13502)
cbf360b37dc is described below
commit cbf360b37dc422e1b6db6b315cb099bf85115c98
Author: Justine Olshan <jo...@confluent.io>
AuthorDate: Fri Apr 14 09:35:45 2023 -0700
KAFKA-14790: Add more AddPartitionsToTxn tests in KafkaApis and Authorizer tests (#13502)
Added AddPartitionsToTxn tests to KafkaApis.
Reviewers: David Jacot <dj...@confluent.io>
---
.../scala/unit/kafka/server/KafkaApisTest.scala | 110 +++++++++++++++++++++
1 file changed, 110 insertions(+)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 2def5df4603..69052491acc 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -2113,6 +2113,116 @@ class KafkaApisTest {
assertEquals(expectedErrors, response.errors())
}
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN)
+ def testHandleAddPartitionsToTxnAuthorizationFailed(version: Short): Unit = {
+ val topic = "topic"
+
+ val transactionalId = "txnId1"
+ val producerId = 15L
+ val epoch = 0.toShort
+
+ val tp = new TopicPartition(topic, 0)
+
+ val addPartitionsToTxnRequest =
+ if (version < 4)
+ AddPartitionsToTxnRequest.Builder.forClient(
+ transactionalId,
+ producerId,
+ epoch,
+ Collections.singletonList(tp)).build(version)
+ else
+ AddPartitionsToTxnRequest.Builder.forBroker(
+ new AddPartitionsToTxnTransactionCollection(
+ List(new AddPartitionsToTxnTransaction()
+ .setTransactionalId(transactionalId)
+ .setProducerId(producerId)
+ .setProducerEpoch(epoch)
+ .setVerifyOnly(true)
+ .setTopics(new AddPartitionsToTxnTopicCollection(
+ Collections.singletonList(new AddPartitionsToTxnTopic()
+ .setName(tp.topic)
+ .setPartitions(Collections.singletonList(tp.partition))
+ ).iterator()))
+ ).asJava.iterator())).build(version)
+
+ val requestChannelRequest = buildRequest(addPartitionsToTxnRequest)
+
+ val authorizer: Authorizer = mock(classOf[Authorizer])
+ when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+ .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
+
+ createKafkaApis(authorizer = Some(authorizer)).handle(
+ requestChannelRequest,
+ RequestLocal.NoCaching
+ )
+
+ val response = verifyNoThrottling[AddPartitionsToTxnResponse](requestChannelRequest)
+ val error = if (version < 4)
+ response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID).get(tp)
+ else
+ Errors.forCode(response.data().errorCode)
+
+ val expectedError = if (version < 4) Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED else Errors.CLUSTER_AUTHORIZATION_FAILED
+ assertEquals(expectedError, error)
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN)
+ def testAddPartitionsToTxnOperationNotAttempted(version: Short): Unit = {
+ val topic = "topic"
+ addTopicToMetadataCache(topic, numPartitions = 1)
+
+ val transactionalId = "txnId1"
+ val producerId = 15L
+ val epoch = 0.toShort
+
+ val tp0 = new TopicPartition(topic, 0)
+ val tp1 = new TopicPartition(topic, 1)
+
+ val addPartitionsToTxnRequest = if (version < 4)
+ AddPartitionsToTxnRequest.Builder.forClient(
+ transactionalId,
+ producerId,
+ epoch,
+ List(tp0, tp1).asJava).build(version)
+ else
+ AddPartitionsToTxnRequest.Builder.forBroker(
+ new AddPartitionsToTxnTransactionCollection(
+ List(new AddPartitionsToTxnTransaction()
+ .setTransactionalId(transactionalId)
+ .setProducerId(producerId)
+ .setProducerEpoch(epoch)
+ .setVerifyOnly(true)
+ .setTopics(new AddPartitionsToTxnTopicCollection(
+ Collections.singletonList(new AddPartitionsToTxnTopic()
+ .setName(tp0.topic)
+ .setPartitions(List[Integer](tp0.partition, tp1.partition()).asJava)
+ ).iterator()))
+ ).asJava.iterator())).build(version)
+
+ val requestChannelRequest = buildRequest(addPartitionsToTxnRequest)
+
+ createKafkaApis().handleAddPartitionsToTxnRequest(
+ requestChannelRequest,
+ RequestLocal.NoCaching
+ )
+
+ val response = verifyNoThrottling[AddPartitionsToTxnResponse](requestChannelRequest)
+
+ def checkErrorForTp(tp: TopicPartition, expectedError: Errors): Unit = {
+ val error = if (version < 4)
+ response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID).get(tp)
+ else
+ response.errors().get(transactionalId).get(tp)
+
+ assertEquals(expectedError, error)
+ }
+
+ checkErrorForTp(tp0, Errors.OPERATION_NOT_ATTEMPTED)
+ checkErrorForTp(tp1, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ }
+
@Test
def shouldReplaceProducerFencedWithInvalidProducerEpochInEndTxnWithOlderClient(): Unit = {
val topic = "topic"