You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/18 18:40:09 UTC
kafka git commit: KAFKA-5171;
TC should not accept empty string transactional id
Repository: kafka
Updated Branches:
refs/heads/trunk d662b09c9 -> 34e379f10
KAFKA-5171; TC should not accept empty string transactional id
This is a rebase version of [PR#2973](https://github.com/apache/kafka/pull/2973).
guozhangwang , please review this updated PR.
Author: umesh chaudhary <um...@gmail.com>
Reviewers: Guozhang Wang <wa...@gmail.com>, Jason Gustafson <ja...@confluent.io>
Closes #3086 from umesh9794/mylocal
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/34e379f1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/34e379f1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/34e379f1
Branch: refs/heads/trunk
Commit: 34e379f10779bfb17fb399fde0357d17dc34ab62
Parents: d662b09
Author: umesh chaudhary <um...@gmail.com>
Authored: Thu May 18 11:38:44 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu May 18 11:38:44 2017 -0700
----------------------------------------------------------------------
.../coordinator/transaction/TransactionCoordinator.scala | 9 +++++++--
.../transaction/TransactionCoordinatorTest.scala | 6 +++---
2 files changed, 10 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/34e379f1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 8de6dbd..491e16a 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -93,11 +93,16 @@ class TransactionCoordinator(brokerId: Int,
def handleInitProducerId(transactionalId: String,
transactionTimeoutMs: Int,
responseCallback: InitProducerIdCallback): Unit = {
- if (transactionalId == null || transactionalId.isEmpty) {
- // if the transactional id is not specified, then always blindly accept the request
+
+ if (transactionalId == null) {
+ // if the transactional id is null, then always blindly accept the request
// and return a new producerId from the producerId manager
val producerId = producerIdManager.generateProducerId()
responseCallback(InitProducerIdResult(producerId, producerEpoch = 0, Errors.NONE))
+ } else if (transactionalId.isEmpty) {
+ //If transactional id is empty then return error as invalid request. This is
+ // to make TransactionCoordinator's behavior consistent with producer client
+ responseCallback(initTransactionError(Errors.INVALID_REQUEST))
} else if (!txnManager.isCoordinatorFor(transactionalId)) {
// check if it is the assigned coordinator for the transactional id
responseCallback(initTransactionError(Errors.NOT_COORDINATOR))
http://git-wip-us.apache.org/repos/asf/kafka/blob/34e379f1/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index 04f76bd..43ad7a7 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -86,14 +86,14 @@ class TransactionCoordinatorTest {
}
@Test
- def shouldAcceptInitPidAndReturnNextPidWhenTransactionalIdIsEmpty(): Unit = {
+ def shouldReturnInvalidRequestWhenTransactionalIdIsEmpty(): Unit = {
mockPidManager()
EasyMock.replay(pidManager)
coordinator.handleInitProducerId("", txnTimeoutMs, initProducerIdMockCallback)
- assertEquals(InitProducerIdResult(0L, 0, Errors.NONE), result)
+ assertEquals(InitProducerIdResult(-1L, -1, Errors.INVALID_REQUEST), result)
coordinator.handleInitProducerId("", txnTimeoutMs, initProducerIdMockCallback)
- assertEquals(InitProducerIdResult(1L, 0, Errors.NONE), result)
+ assertEquals(InitProducerIdResult(-1L, -1, Errors.INVALID_REQUEST), result)
}
@Test