You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Sebastian Puzoń (JIRA)" <ji...@apache.org> on 2018/10/23 08:05:00 UTC
[jira] [Created] (KAFKA-7531) NPE NullPointerException at
TransactionCoordinator handleEndTransaction
Sebastian Puzoń created KAFKA-7531:
--------------------------------------
Summary: NPE NullPointerException at TransactionCoordinator handleEndTransaction
Key: KAFKA-7531
URL: https://issues.apache.org/jira/browse/KAFKA-7531
Project: Kafka
Issue Type: Bug
Components: controller
Affects Versions: 2.0.0
Reporter: Sebastian Puzoń
Kafka cluster with 4 brokers, 1 topic (20 partitions), 1 zookeeper.
Streams Application 4 instances, each has 5 Streams threads, total 20 stream threads.
I observe NPE NullPointerException at coordinator broker which causes all application stream threads shutdown, here's stack from broker:
{code:java}
[2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Member elog_agg-client-sswvlp6802-StreamThread-4-consumer-cbcd4704-a346-45ea-80f9-96f62fc2dabe in group elo
g_agg has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Preparing to rebalance group elog_agg with old generation 49 (__consumer_offsets-21) (kafka.coordinator.gro
up.GroupCoordinator)
[2018-10-22 21:51:17,519] INFO [GroupCoordinator 2]: Stabilized group elog_agg generation 50 (__consumer_offsets-21) (kafka.coordinator.group.GroupCoordinator)
[2018-10-22 21:51:17,524] INFO [GroupCoordinator 2]: Assignment received from leader for group elog_agg for generation 50 (kafka.coordinator.group.GroupCoordina
tor)
[2018-10-22 21:51:27,596] INFO [TransactionCoordinator id=2] Initialized transactionalId elog_agg-0_14 with producerId 1001 and producer epoch 20 on partition _
_transaction_state-16 (kafka.coordinator.transaction.TransactionCoordinator)
[
[2018-10-22 21:52:00,920] ERROR [KafkaApi-2] Error when handling request {transactional_id=elog_agg-0_3,producer_id=1004,producer_epoch=16,transaction_result=tr
ue} (kafka.server.KafkaApis)
java.lang.NullPointerException
at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383)
at scala.util.Either$RightProjection.flatMap(Either.scala:702)
at kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372)
at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437)
at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437)
at kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581)
at kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619)
at kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619)
at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110)
at kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232)
at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495)
at kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
at kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590)
at kafka.coordinator.transaction.TransactionCoordinator.handleEndTransaction(TransactionCoordinator.scala:437)
at kafka.server.KafkaApis.handleEndTxnRequest(KafkaApis.scala:1653)
at kafka.server.KafkaApis.handle(KafkaApis.scala:132)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
at java.lang.Thread.run(Thread.java:745)
[2018-10-22 21:52:15,958] ERROR [KafkaApi-2] Error when handling request {transactional_id=elog_agg-0_9,producer_id=1005,producer_epoch=8,transaction_result=true} (kafka.server.KafkaApis)
java.lang.NullPointerException
at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383)
at scala.util.Either$RightProjection.flatMap(Either.scala:702)
at kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372)
at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437)
at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437)
at kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581)
at kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619)
at kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619)
at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110)
at kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232)
at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495)
at kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
at kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590)
at kafka.coordinator.transaction.TransactionCoordinator.handleEndTransaction(TransactionCoordinator.scala:437)
at kafka.server.KafkaApis.handleEndTxnRequest(KafkaApis.scala:1653)
at kafka.server.KafkaApis.handle(KafkaApis.scala:132)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
at java.lang.Thread.run(Thread.java:745)
[2018-10-22 21:52:27,531] INFO [GroupCoordinator 2]: Member elog_agg-client-sswvlp6804-StreamThread-4-consumer-ae1f00c2-7c2c-4f8e-bed4-20a955ecc122 in group elog_agg has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator){code}
On the application side I can see such stack trace:
{code:java}
2018-10-22 21:52:15 AssignedStreamsTasks [ERROR] stream-thread [elog_agg-client-sswvlp6802-StreamThread-4] Failed to commit stream task 0_9 due to the following error:
org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The server experienced an unexpected error when processing the request
at org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1189)
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:907)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:532)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:524)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:745)
2018-10-22 21:52:15 StreamThread [INFO] stream-thread [elog_agg-client-sswvlp6802-StreamThread-4] State transition from RUNNING to PENDING_SHUTDOWN
2018-10-22 21:52:15 StreamThread [INFO] stream-thread [elog_agg-client-sswvlp6802-StreamThread-4] Shutting down
2018-10-22 21:52:15 KafkaProducer [INFO] [Producer clientId=elog_agg-client-sswvlp6802-StreamThread-4-0_17-producer, transactionalId=elog_agg-0_17] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2018-10-22 21:52:16 AssignedStreamsTasks [ERROR] stream-thread [elog_agg-client-sswvlp6802-StreamThread-4] Failed while closing StreamTask 0_9 due to the following error:
org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784)
at org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:229)
at org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:679)
at org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:563)
at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:624)
at org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:410)
at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1172)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
Caused by: org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The server experienced an unexpected error when processing the request
at org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1189)
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:907)
{code}
This way all streams application threads are being shutdown.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)