You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jo...@apache.org on 2023/04/14 16:35:53 UTC

[kafka] branch trunk updated: KAFKA-14790: Add more AddPartitionsToTxn tests in KafkaApis and Authorizer tests (#13502)

This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cbf360b37dc KAFKA-14790: Add more AddPartitionsToTxn tests in KafkaApis and Authorizer tests (#13502)
cbf360b37dc is described below

commit cbf360b37dc422e1b6db6b315cb099bf85115c98
Author: Justine Olshan <jo...@confluent.io>
AuthorDate: Fri Apr 14 09:35:45 2023 -0700

    KAFKA-14790: Add more AddPartitionsToTxn tests in KafkaApis and Authorizer tests (#13502)
    
    Added AddPartitionsToTxn tests to KafkaApis.
    
    Reviewers: David Jacot <dj...@confluent.io>
---
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 110 +++++++++++++++++++++
 1 file changed, 110 insertions(+)

diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 2def5df4603..69052491acc 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -2113,6 +2113,116 @@ class KafkaApisTest {
     assertEquals(expectedErrors, response.errors())
   }
 
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN)
+  def testHandleAddPartitionsToTxnAuthorizationFailed(version: Short): Unit = {
+    val topic = "topic"
+
+    val transactionalId = "txnId1"
+    val producerId = 15L
+    val epoch = 0.toShort
+
+    val tp = new TopicPartition(topic, 0)
+
+    val addPartitionsToTxnRequest = 
+      if (version < 4) 
+        AddPartitionsToTxnRequest.Builder.forClient(
+          transactionalId,
+          producerId,
+          epoch,
+          Collections.singletonList(tp)).build(version)
+      else
+        AddPartitionsToTxnRequest.Builder.forBroker(
+          new AddPartitionsToTxnTransactionCollection(
+            List(new AddPartitionsToTxnTransaction()
+              .setTransactionalId(transactionalId)
+              .setProducerId(producerId)
+              .setProducerEpoch(epoch)
+              .setVerifyOnly(true)
+              .setTopics(new AddPartitionsToTxnTopicCollection(
+                Collections.singletonList(new AddPartitionsToTxnTopic()
+                  .setName(tp.topic)
+                  .setPartitions(Collections.singletonList(tp.partition))
+                ).iterator()))
+            ).asJava.iterator())).build(version)
+
+    val requestChannelRequest = buildRequest(addPartitionsToTxnRequest)
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+      .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
+
+    createKafkaApis(authorizer = Some(authorizer)).handle(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
+
+    val response = verifyNoThrottling[AddPartitionsToTxnResponse](requestChannelRequest)
+    val error = if (version < 4) 
+      response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID).get(tp) 
+    else
+      Errors.forCode(response.data().errorCode)
+      
+    val expectedError = if (version < 4) Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED else Errors.CLUSTER_AUTHORIZATION_FAILED
+    assertEquals(expectedError, error)
+  }
+
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN)
+  def testAddPartitionsToTxnOperationNotAttempted(version: Short): Unit = {
+    val topic = "topic"
+    addTopicToMetadataCache(topic, numPartitions = 1)
+
+    val transactionalId = "txnId1"
+    val producerId = 15L
+    val epoch = 0.toShort
+
+    val tp0 = new TopicPartition(topic, 0)
+    val tp1 = new TopicPartition(topic, 1)
+
+    val addPartitionsToTxnRequest = if (version < 4)
+      AddPartitionsToTxnRequest.Builder.forClient(
+        transactionalId,
+        producerId,
+        epoch,
+        List(tp0, tp1).asJava).build(version)
+    else
+      AddPartitionsToTxnRequest.Builder.forBroker(
+        new AddPartitionsToTxnTransactionCollection(
+          List(new AddPartitionsToTxnTransaction()
+            .setTransactionalId(transactionalId)
+            .setProducerId(producerId)
+            .setProducerEpoch(epoch)
+            .setVerifyOnly(true)
+            .setTopics(new AddPartitionsToTxnTopicCollection(
+              Collections.singletonList(new AddPartitionsToTxnTopic()
+                .setName(tp0.topic)
+                .setPartitions(List[Integer](tp0.partition, tp1.partition()).asJava)
+              ).iterator()))
+          ).asJava.iterator())).build(version)
+
+    val requestChannelRequest = buildRequest(addPartitionsToTxnRequest)
+
+    createKafkaApis().handleAddPartitionsToTxnRequest(
+      requestChannelRequest,
+      RequestLocal.NoCaching
+    )
+
+    val response = verifyNoThrottling[AddPartitionsToTxnResponse](requestChannelRequest)
+    
+    def checkErrorForTp(tp: TopicPartition, expectedError: Errors): Unit = {
+      val error = if (version < 4)
+        response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID).get(tp)
+      else
+        response.errors().get(transactionalId).get(tp)
+
+      assertEquals(expectedError, error)
+    }
+
+    checkErrorForTp(tp0, Errors.OPERATION_NOT_ATTEMPTED)
+    checkErrorForTp(tp1, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+  }
+
   @Test
   def shouldReplaceProducerFencedWithInvalidProducerEpochInEndTxnWithOlderClient(): Unit = {
     val topic = "topic"