You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Jason Gustafson (JIRA)" <ji...@apache.org> on 2018/11/02 00:43:00 UTC

[jira] [Comment Edited] (KAFKA-7531) NPE NullPointerException at TransactionCoordinator handleEndTransaction

    [ https://issues.apache.org/jira/browse/KAFKA-7531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672410#comment-16672410 ] 

Jason Gustafson edited comment on KAFKA-7531 at 11/2/18 12:43 AM:
------------------------------------------------------------------

[~spuzon] Can you post the git commit id for the broker? It should be in the log when the server starts up. The reason I ask is that the line numbers in the trace do not appear to match the code in 2.0.0. 


was (Author: hachikuji):
[~spuzon] Can you post the git commit id for the broker? It should be in the log when the server starts up. The reason I ask is that the line numbers in the trace do not appear to match up the code in 2.0.0. 

> NPE NullPointerException at TransactionCoordinator handleEndTransaction
> -----------------------------------------------------------------------
>
>                 Key: KAFKA-7531
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7531
>             Project: Kafka
>          Issue Type: Bug
>          Components: controller
>    Affects Versions: 2.0.0
>            Reporter: Sebastian Puzoń
>            Priority: Critical
>             Fix For: 2.1.1, 2.0.2
>
>
> Kafka cluster with 4 brokers, 1 topic (20 partitions), 1 zookeeper.
> Streams Application 4 instances, each has 5 Streams threads, total 20 stream threads.
> I observe NPE NullPointerException at coordinator broker which causes all application stream threads shutdown, here's stack from broker:
> {code:java}
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Member elog_agg-client-sswvlp6802-StreamThread-4-consumer-cbcd4704-a346-45ea-80f9-96f62fc2dabe in group elo
> g_agg has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Preparing to rebalance group elog_agg with old generation 49 (__consumer_offsets-21) (kafka.coordinator.gro
> up.GroupCoordinator)
> [2018-10-22 21:51:17,519] INFO [GroupCoordinator 2]: Stabilized group elog_agg generation 50 (__consumer_offsets-21) (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:51:17,524] INFO [GroupCoordinator 2]: Assignment received from leader for group elog_agg for generation 50 (kafka.coordinator.group.GroupCoordina
> tor)
> [2018-10-22 21:51:27,596] INFO [TransactionCoordinator id=2] Initialized transactionalId elog_agg-0_14 with producerId 1001 and producer epoch 20 on partition _
> _transaction_state-16 (kafka.coordinator.transaction.TransactionCoordinator)
> [
> [2018-10-22 21:52:00,920] ERROR [KafkaApi-2] Error when handling request {transactional_id=elog_agg-0_3,producer_id=1004,producer_epoch=16,transaction_result=tr
> ue} (kafka.server.KafkaApis)
> java.lang.NullPointerException
>  at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>  at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383)
>  at scala.util.Either$RightProjection.flatMap(Either.scala:702)
>  at kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372)
>  at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437)
>  at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437)
>  at kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581)
>  at kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619)
>  at kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619)
>  at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
>  at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
>  at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110)
>  at kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232)
>  at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495)
>  at kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
>  at kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590)
>  at kafka.coordinator.transaction.TransactionCoordinator.handleEndTransaction(TransactionCoordinator.scala:437)
>  at kafka.server.KafkaApis.handleEndTxnRequest(KafkaApis.scala:1653)
>  at kafka.server.KafkaApis.handle(KafkaApis.scala:132)
>  at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>  at java.lang.Thread.run(Thread.java:745)
> [2018-10-22 21:52:15,958] ERROR [KafkaApi-2] Error when handling request {transactional_id=elog_agg-0_9,producer_id=1005,producer_epoch=8,transaction_result=true} (kafka.server.KafkaApis)
> java.lang.NullPointerException
>  at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>  at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383)
>  at scala.util.Either$RightProjection.flatMap(Either.scala:702)
>  at kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372)
>  at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437)
>  at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437)
>  at kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581)
>  at kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619)
>  at kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619)
>  at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
>  at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
>  at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110)
>  at kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232)
>  at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495)
>  at kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
>  at kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590)
>  at kafka.coordinator.transaction.TransactionCoordinator.handleEndTransaction(TransactionCoordinator.scala:437)
>  at kafka.server.KafkaApis.handleEndTxnRequest(KafkaApis.scala:1653)
>  at kafka.server.KafkaApis.handle(KafkaApis.scala:132)
>  at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>  at java.lang.Thread.run(Thread.java:745)
> [2018-10-22 21:52:27,531] INFO [GroupCoordinator 2]: Member elog_agg-client-sswvlp6804-StreamThread-4-consumer-ae1f00c2-7c2c-4f8e-bed4-20a955ecc122 in group elog_agg has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator){code}
>  
>  
> On the application side I can see such stack trace:
>  
>  
> {code:java}
> 2018-10-22 21:52:15 AssignedStreamsTasks [ERROR] stream-thread [elog_agg-client-sswvlp6802-StreamThread-4] Failed to commit stream task 0_9 due to the following error:
> org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The server experienced an unexpected error when processing the request
> at org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1189)
> at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:907)
> at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
> at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:532)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:524)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
> at java.lang.Thread.run(Thread.java:745)
> 2018-10-22 21:52:15 StreamThread [INFO] stream-thread [elog_agg-client-sswvlp6802-StreamThread-4] State transition from RUNNING to PENDING_SHUTDOWN
> 2018-10-22 21:52:15 StreamThread [INFO] stream-thread [elog_agg-client-sswvlp6802-StreamThread-4] Shutting down
> 2018-10-22 21:52:15 KafkaProducer [INFO] [Producer clientId=elog_agg-client-sswvlp6802-StreamThread-4-0_17-producer, transactionalId=elog_agg-0_17] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
> 2018-10-22 21:52:16 AssignedStreamsTasks [ERROR] stream-thread [elog_agg-client-sswvlp6802-StreamThread-4] Failed while closing StreamTask 0_9 due to the following error:
> org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
> at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784)
> at org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:229)
> at org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:679)
> at org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:563)
> at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:624)
> at org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:410)
> at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
> at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1172)
> at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
> Caused by: org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The server experienced an unexpected error when processing the request
> at org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1189)
> at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:907)
> {code}
> This way all streams application threads are being shutdown.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)