You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jason Gustafson (Jira)" <ji...@apache.org> on 2020/07/15 21:25:00 UTC

[jira] [Resolved] (KAFKA-9666) Transactional producer Epoch could not be reset

     [ https://issues.apache.org/jira/browse/KAFKA-9666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jason Gustafson resolved KAFKA-9666.
------------------------------------
    Resolution: Fixed

> Transactional producer Epoch could not be reset
> -----------------------------------------------
>
>                 Key: KAFKA-9666
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9666
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Boyang Chen
>            Assignee: Bob Barrett
>            Priority: Critical
>             Fix For: 2.7.0, 2.6.1
>
>
> As of today, the producer epoch keeps increasing until it hits Short.Max. The correct behavior at this point should be making another call to re-initialize a new PID, otherwise trying with Short.Max will throw fatal exception which eventually kills the producer.
> Stream log:
> [2020-03-04T20:25:41-08:00] (streams-soak-2-5-eos_soak_i-00d70680c58afa228_streamslog) [2020-03-05 04:25:41,147] ERROR [stream-soak-test-689bb912-b06c-4c42-88e5-d9578f7ebfdf-StreamThread-3] Thread    StreamsThread threadId: stream-soak-test-689bb912-b06c-4c42-88e5-d9578f7ebfdf-StreamThread-3
> [2020-03-04T20:25:41-08:00] (streams-soak-2-5-eos_soak_i-00d70680c58afa228_streamslog) TaskManager
>         MetadataState:
>                 GlobalMetadata: []
>                 GlobalStores: []
>                 My HostInfo: HostInfo\{host='unknown', port=-1}
>                 Cluster(id = null, nodes = [], partitions = [], controller = null)
>         Active tasks:
>                 Running:
>                 Running Partitions:
>                 New:
>                 Restoring:
>                 Restoring Partitions:
>                 Restored Partitions:
>                 Suspended:
>         Standby tasks:
>                 Running:
>                 Running Partitions:
>                 New:
>  encountered an error processing soak test (org.apache.kafka.streams.StreamsSoakTest)
> [2020-03-04T20:25:41-08:00] (streams-soak-2-5-eos_soak_i-00d70680c58afa228_streamslog) org.apache.kafka.streams.errors.StreamsException: stream-thread [stream-soak-test-689bb912-b06c-4c42-88e5-d9578f7ebfdf-StreamThread-3] Failed to rebalance.
>         at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:862)
>         at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:749)
>         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
> [2020-03-04T20:25:41-08:00] (streams-soak-2-5-eos_soak_i-00d70680c58afa228_streamslog) Caused by: org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; The server experienced an unexpected error when processing the request.
>         at org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352)
>         at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260)
>         at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>         at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:571)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:563)
>         at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414)
>         at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
>         at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
>         at java.lang.Thread.run(Thread.java:748)
>  
>  
> Producer log:
> [2020-03-04T20:25:41-08:00] (streams-soak-2-5-eos_broker_i-0d1ef6c9a1a1708f6_server-log) [2020-03-05 04:25:40,885] INFO [Transaction State Manager 1001]: TransactionalId stream-soak-test-1_0 append transaction log for TxnTransitMetadata(producerId=0, producerEpoch=576, txnTimeoutMs=60000, txnState=Ongoing, topicPartitions=Set(stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000013-changelog-0, stream-soak-test-logData10MinuteFinalCount-store-changelog-0, stream-soak-test-logData10MinuteSuppressedCount-store-changelog-0, stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000019-changelog-0, stream-soak-test-windowed-node-counts-STATE-STORE-0000000030-changelog-0, stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000025-changelog-0, stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000007-changelog-0, windowed-node-counts-0), txnStartTimestamp=1583382340885, txnLastUpdateTimestamp=1583382340885) transition failed due to COORDINATOR_NOT_AVAILABLE, resetting pending state from Some(Ongoing), aborting state transition and returning COORDINATOR_NOT_AVAILABLE in the callback (kafka.coordinator.transaction.TransactionStateManager)
> [2020-03-04T20:25:41-08:00] (streams-soak-2-5-eos_broker_i-0d1ef6c9a1a1708f6_stdout) java.lang.IllegalStateException: Cannot fence producer with epoch equal to Short.MaxValue since this would overflow
>         at kafka.coordinator.transaction.TransactionMetadata.prepareFenceProducerEpoch(TransactionMetadata.scala:194)
>         at kafka.coordinator.transaction.TransactionCoordinator.kafka$coordinator$transaction$TransactionCoordinator$$prepareInitProduceIdTransit(TransactionCoordinator.scala:216)
>         at kafka.coordinator.transaction.TransactionCoordinator$$anonfun$2$$anonfun$apply$1.apply(TransactionCoordinator.scala:143)
>         at kafka.coordinator.transaction.TransactionCoordinator$$anonfun$2$$anonfun$apply$1.apply(TransactionCoordinator.scala:143)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>         at kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>         at kafka.coordinator.transaction.TransactionCoordinator$$anonfun$2.apply(TransactionCoordinator.scala:142)
>         at kafka.coordinator.transaction.TransactionCoordinator$$anonfun$2.apply(TransactionCoordinator.scala:138)
>         at scala.util.Either$RightProjection.flatMap(Either.scala:522)
>         at kafka.coordinator.transaction.TransactionCoordinator.handleInitProducerId(TransactionCoordinator.scala:137)
>         at kafka.server.KafkaApis.handleInitProducerIdRequest(KafkaApis.scala:1638)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:135)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>         at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)