You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/18 18:40:09 UTC

kafka git commit: KAFKA-5171; TC should not accept empty string transactional id

Repository: kafka
Updated Branches:
  refs/heads/trunk d662b09c9 -> 34e379f10


KAFKA-5171; TC should not accept empty string transactional id

This is a rebase version of [PR#2973](https://github.com/apache/kafka/pull/2973).

guozhangwang , please review this updated PR.

Author: umesh chaudhary <um...@gmail.com>

Reviewers: Guozhang Wang <wa...@gmail.com>, Jason Gustafson <ja...@confluent.io>

Closes #3086 from umesh9794/mylocal


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/34e379f1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/34e379f1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/34e379f1

Branch: refs/heads/trunk
Commit: 34e379f10779bfb17fb399fde0357d17dc34ab62
Parents: d662b09
Author: umesh chaudhary <um...@gmail.com>
Authored: Thu May 18 11:38:44 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu May 18 11:38:44 2017 -0700

----------------------------------------------------------------------
 .../coordinator/transaction/TransactionCoordinator.scala    | 9 +++++++--
 .../transaction/TransactionCoordinatorTest.scala            | 6 +++---
 2 files changed, 10 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/34e379f1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 8de6dbd..491e16a 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -93,11 +93,16 @@ class TransactionCoordinator(brokerId: Int,
   def handleInitProducerId(transactionalId: String,
                            transactionTimeoutMs: Int,
                            responseCallback: InitProducerIdCallback): Unit = {
-    if (transactionalId == null || transactionalId.isEmpty) {
-      // if the transactional id is not specified, then always blindly accept the request
+
+    if (transactionalId == null) {
+      // if the transactional id is null, then always blindly accept the request
       // and return a new producerId from the producerId manager
       val producerId = producerIdManager.generateProducerId()
       responseCallback(InitProducerIdResult(producerId, producerEpoch = 0, Errors.NONE))
+    } else if (transactionalId.isEmpty) {
+      //If transactional id is empty then return error as invalid request. This is
+      // to make TransactionCoordinator's behavior consistent with producer client
+      responseCallback(initTransactionError(Errors.INVALID_REQUEST))
     } else if (!txnManager.isCoordinatorFor(transactionalId)) {
       // check if it is the assigned coordinator for the transactional id
       responseCallback(initTransactionError(Errors.NOT_COORDINATOR))

http://git-wip-us.apache.org/repos/asf/kafka/blob/34e379f1/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index 04f76bd..43ad7a7 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -86,14 +86,14 @@ class TransactionCoordinatorTest {
   }
 
   @Test
-  def shouldAcceptInitPidAndReturnNextPidWhenTransactionalIdIsEmpty(): Unit = {
+  def shouldReturnInvalidRequestWhenTransactionalIdIsEmpty(): Unit = {
     mockPidManager()
     EasyMock.replay(pidManager)
 
     coordinator.handleInitProducerId("", txnTimeoutMs, initProducerIdMockCallback)
-    assertEquals(InitProducerIdResult(0L, 0, Errors.NONE), result)
+    assertEquals(InitProducerIdResult(-1L, -1, Errors.INVALID_REQUEST), result)
     coordinator.handleInitProducerId("", txnTimeoutMs, initProducerIdMockCallback)
-    assertEquals(InitProducerIdResult(1L, 0, Errors.NONE), result)
+    assertEquals(InitProducerIdResult(-1L, -1, Errors.INVALID_REQUEST), result)
   }
 
   @Test