You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Peter Davis (JIRA)" <ji...@apache.org> on 2019/02/16 00:59:00 UTC

[jira] [Comment Edited] (KAFKA-7088) Kafka streams thread waits infinitely on transaction init

    [ https://issues.apache.org/jira/browse/KAFKA-7088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16769912#comment-16769912 ] 

Peter Davis edited comment on KAFKA-7088 at 2/16/19 12:58 AM:
--------------------------------------------------------------

Encountered this bug today with 0.11.0 client.  Similar same stack trace and symptoms.  Caused a major issue in production.

In 0.11.0 there is no timeout for this await().  A timeout was added by KAFKA-6446 in 2.x.  KAFKA-6446 also appears to fix the deadlock – instead an error would be thrown. (Fixed/dup?)

KAFKA-6446 also suggests the root cause – *the broker is either unavailable or has stopped responding to InitProducerIdRequests.*

**Very interesting error on broker that immediately preceded the outage – *"failed: this should not happen"* followed by a transaction metadata error.  Eventually, we identified that this particular broker was in a bad state, restarted it, and the problem immediately resolved.  Broker is running CP 4.1.2, which is Apache Kafka 1.1.1-cp1 (not sure what all is in the -cp1 patch).  It looks like this particular tx metadata error is thrown when validating the producer epoch, which means either the broker or producer times out.  This particular broker is on a VM and we've seen issues with pauses/performance issues in the past that can cause a broker to get into a bad state after timing out.  (Have never seen an issue with Transaction coordinator before, but plenty of issues with ZooKeeper and Group Coordinators on VMs.)
{quote}{{February 15th 2019, 04:32:15.538 xxxxxx_broker_xxxxxxx.example.com [2019-02-15 12:32:08,191] ERROR TransactionMetadata(transactionalId=xxxxxx_group_xxxxxxx--0_1, producerId=106000, producerEpoch=427, txnTimeoutMs=60000, state=CompleteCommit, pendingState=Some(Ongoing), topicPartitions=Set(), txnStartTimestamp=1550233928711, txnLastUpdateTimestamp=1550233927976)'s transition to TxnTransitMetadata(producerId=106000, producerEpoch=427, txnTimeoutMs=60000, txnState=Ongoing, topicPartitions=Set(xxxxxx_topic_xxxxxxx-1), txnStartTimestamp=1550233928001, txnLastUpdateTimestamp=1550233928001) +*failed: this should not happen*+ (kafka.coordinator.transaction.TransactionMetadata)}}{{February 15th 2019, 04:32:15.538 xxxxxx_broker_xxxxxxx.example.com [2019-02-15 12:32:08,239] ERROR [KafkaApi-1] Error when handling request {replica_id=103,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,session_id=647691730,epoch=6210667,topics=[{topic=__transaction_state,partitions=[
Unknown macro: \{partition=25,fetch_offset=32613278,log_start_offset=0,max_bytes=10485760}
]}],forgetten_topics_data=[]} (kafka.server.KafkaApis)}}
 {{java.lang.IllegalStateException: TransactionalId xxxxxx_group_xxxxxxx-0_1 *failed transition to state* TxnTransitMetadata(producerId=106000, producerEpoch=427, txnTimeoutMs=60000, txnState=Ongoing, topicPartitions=Set(xxxxxx_topic_xxxxxxx-1), txnStartTimestamp=1550233928001, txnLastUpdateTimestamp=1550233928001) due to unexpected metadata}}
 \{{ at kafka.coordinator.transaction.TransactionMetadata.throwStateTransitionFailure(TransactionMetadata.scala:390)}}
 {{ at kafka.coordinator.transaction.TransactionMetadata.*completeTransitionTo(TransactionMetadata.scala:326)*}}
 \{{ at kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$updateCacheCallback$1$1.apply$mcV$sp(TransactionStateManager.scala:534)}}
 \{{ at kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$updateCacheCallback$1$1.apply(TransactionStateManager.scala:526)}}
 \{{ at kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$updateCacheCallback$1$1.apply(TransactionStateManager.scala:526)}}
 \{{ at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)}}
 \{{ at kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)}}
 \{{ at kafka.coordinator.transaction.TransactionStateManager.kafka$coordinator$transaction$TransactionStateManager$$updateCacheCallback$1(TransactionStateManager.scala:525)}}
 \{{ at kafka.coordinator.transaction.TransactionStateManager$$anonfun$appendTransactionToLog$1$$anonfun$apply$mcV$sp$11.apply(TransactionStateManager.scala:620)}}
 \{{ at kafka.coordinator.transaction.TransactionStateManager$$anonfun$appendTransactionToLog$1$$anonfun$apply$mcV$sp$11.apply(TransactionStateManager.scala:620)}}
 \{{ at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)}}
 \{{ at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)}}
 \{{ at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:111)}}
 \{{ at kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121)}}
 \{{ at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:371)}}
 \{{ at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:277)}}
 \{{ at kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:289)}}
 \{{ at kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:503)}}
 \{{ at kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:355)}}
 \{{ at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:1351)}}
 \{{ at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:1345)}}
 \{{ at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)}}
 \{{ at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)}}
 \{{ at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)}}
 \{{ at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)}}
 \{{ at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)}}
 \{{ at scala.collection.AbstractTraversable.map(Traversable.scala:104)}}
 \{{ at kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:1345)}}
 \{{ at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:814)}}
 \{{ at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:818)}}
 \{{ at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:629)}}
 \{{ at kafka.server.KafkaApis.handle(KafkaApis.scala:112)}}
 \{{ at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)}}
 \{{ at java.lang.Thread.run(Thread.java:748)}}
{quote}
Debug logs on client during deadlock, after enabling DEBUG logging for "org.apache.kafka.clients.producer.internals.Sender" (thanks for the tip!).  Repeated InitProducerIdRequests to the same broker on which the above strange error occurred.  Resolved immediately after restarting said broker.  This was logged repeatedly, *every 100ms*:
{quote}{{DEBUG internals.Sender [TransactionalId xxxxxx_group_xxxxxxx-0_1] Sending transactional request (type=InitProducerIdRequest, transactionalId=xxxxxx_group_xxxxxxx-0_1, transactionTimeoutMs=60000) to node xxxxxx_broker_xxxxxxx.example.com:9093 (id: 1 rack: null)}}
{quote}
(Aside from timing out, shouldn't there be a warning or something if transaction requests fail?)

Stack trace with 0.11.0 client:
{quote}{{ java.lang.Thread.State: WAITING (parking)}}
 \{{ at sun.misc.Unsafe.park(Native Method)}}
 \{{ - parking to wait for <0x000000070cc0d148> (a java.util.concurrent.CountDownLatch$Sync)}}
 \{{ at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)}}
 \{{ at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)}}
 \{{ at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)}}
 \{{ at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)}}
 \{{ at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)}}
 {{ at org.apache.kafka.clients.producer.internals.*TransactionalRequestResult.await*(TransactionalRequestResult.java:50)}}
 {{ at org.apache.kafka.clients.producer.*KafkaProducer.initTransactions*(KafkaProducer.java:533)}}
 \{{ at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:143)}}
 \{{ at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)}}
 \{{ at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)}}
 \{{ at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)}}
 \{{ at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)}}
 \{{ at org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)}}
 \{{ at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)}}
 \{{ at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)}}
 \{{ at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)}}
 \{{ at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)}}
 \{{ at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)}}
 \{{ at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)}}
 \{{ at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)}}
 \{{ at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)}}
 \{{ at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)}}
 \{{ at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) }}
{quote}
EDIT: sorry for terrible formatting.  Thanks Jira editor.


was (Author: davispw):
Encountered this bug today with 0.11.0 client.  Similar same stack trace and symptoms.  Caused a major issue in production.

In 0.11.0 there is no timeout for this await().  A timeout was added by KAFKA-6446 in 2.x.  KAFKA-6446 also appears to fix the deadlock – instead an error would be thrown. (Fixed/dup?)

KAFKA-6446 also suggests the root cause – *the broker is either unavailable or has stopped responding to InitProducerIdRequests.*

**Very interesting error on broker that immediately preceded the outage – *"failed: this should not happen"* followed by a transaction metadata error.  Eventually, we identified that this particular broker was in a bad state, restarted it, and the problem immediately resolved.  Broker is running CP 4.1.2, which is Apache Kafka 1.1.1-cp1 (not sure what all is in the -cp1 patch).  It looks like this particular tx metadata error is thrown when validating the producer epoch, which means either the broker or producer times out.  This particular broker is on a VM and we've seen issues with pauses/performance issues in the past that can cause a broker to get into a bad state after timing out (unrelated but illustrative example: ).
{quote}{{February 15th 2019, 04:32:15.538 xxxxxx_broker_xxxxxxx.example.com [2019-02-15 12:32:08,191] ERROR TransactionMetadata(transactionalId=xxxxxx_group_xxxxxxx--0_1, producerId=106000, producerEpoch=427, txnTimeoutMs=60000, state=CompleteCommit, pendingState=Some(Ongoing), topicPartitions=Set(), txnStartTimestamp=1550233928711, txnLastUpdateTimestamp=1550233927976)'s transition to TxnTransitMetadata(producerId=106000, producerEpoch=427, txnTimeoutMs=60000, txnState=Ongoing, topicPartitions=Set(xxxxxx_topic_xxxxxxx-1), txnStartTimestamp=1550233928001, txnLastUpdateTimestamp=1550233928001) +*failed: this should not happen*+ (kafka.coordinator.transaction.TransactionMetadata)}}{{February 15th 2019, 04:32:15.538 xxxxxx_broker_xxxxxxx.example.com [2019-02-15 12:32:08,239] ERROR [KafkaApi-1] Error when handling request \{replica_id=103,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,session_id=647691730,epoch=6210667,topics=[{topic=__transaction_state,partitions=[{partition=25,fetch_offset=32613278,log_start_offset=0,max_bytes=10485760}]}],forgetten_topics_data=[]} (kafka.server.KafkaApis)}}
{{java.lang.IllegalStateException: TransactionalId xxxxxx_group_xxxxxxx-0_1 *failed transition to state* TxnTransitMetadata(producerId=106000, producerEpoch=427, txnTimeoutMs=60000, txnState=Ongoing, topicPartitions=Set(xxxxxx_topic_xxxxxxx-1), txnStartTimestamp=1550233928001, txnLastUpdateTimestamp=1550233928001) due to unexpected metadata}}
{{ at kafka.coordinator.transaction.TransactionMetadata.throwStateTransitionFailure(TransactionMetadata.scala:390)}}
{{ at kafka.coordinator.transaction.TransactionMetadata.*completeTransitionTo(TransactionMetadata.scala:326)*}}
{{ at kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$updateCacheCallback$1$1.apply$mcV$sp(TransactionStateManager.scala:534)}}
{{ at kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$updateCacheCallback$1$1.apply(TransactionStateManager.scala:526)}}
{{ at kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$updateCacheCallback$1$1.apply(TransactionStateManager.scala:526)}}
{{ at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)}}
{{ at kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)}}
{{ at kafka.coordinator.transaction.TransactionStateManager.kafka$coordinator$transaction$TransactionStateManager$$updateCacheCallback$1(TransactionStateManager.scala:525)}}
{{ at kafka.coordinator.transaction.TransactionStateManager$$anonfun$appendTransactionToLog$1$$anonfun$apply$mcV$sp$11.apply(TransactionStateManager.scala:620)}}
{{ at kafka.coordinator.transaction.TransactionStateManager$$anonfun$appendTransactionToLog$1$$anonfun$apply$mcV$sp$11.apply(TransactionStateManager.scala:620)}}
{{ at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)}}
{{ at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)}}
{{ at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:111)}}
{{ at kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121)}}
{{ at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:371)}}
{{ at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:277)}}
{{ at kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:289)}}
{{ at kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:503)}}
{{ at kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:355)}}
{{ at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:1351)}}
{{ at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:1345)}}
{{ at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)}}
{{ at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)}}
{{ at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)}}
{{ at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)}}
{{ at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)}}
{{ at scala.collection.AbstractTraversable.map(Traversable.scala:104)}}
{{ at kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:1345)}}
{{ at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:814)}}
{{ at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:818)}}
{{ at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:629)}}
{{ at kafka.server.KafkaApis.handle(KafkaApis.scala:112)}}
{{ at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)}}
{{ at java.lang.Thread.run(Thread.java:748)}}{quote}
Debug logs on client during deadlock, after enabling DEBUG logging for "org.apache.kafka.clients.producer.internals.Sender" (thanks for the tip!).  Repeated InitProducerIdRequests to the same broker on which the above strange error occurred.  Resolved immediately after restarting said broker.  This was logged repeatedly, *every 100ms*:
{quote}{{DEBUG internals.Sender [TransactionalId xxxxxx_group_xxxxxxx-0_1] Sending transactional request (type=InitProducerIdRequest, transactionalId=xxxxxx_group_xxxxxxx-0_1, transactionTimeoutMs=60000) to node xxxxxx_broker_xxxxxxx.example.com:9093 (id: 1 rack: null)}}{quote}
(Aside from timing out, shouldn't there be a warning or something if transaction requests fail?)

Stack trace with 0.11.0 client:
{quote}{{ java.lang.Thread.State: WAITING (parking)}}
{{ at sun.misc.Unsafe.park(Native Method)}}
{{ - parking to wait for <0x000000070cc0d148> (a java.util.concurrent.CountDownLatch$Sync)}}
{{ at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)}}
{{ at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)}}
{{ at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)}}
{{ at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)}}
{{ at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)}}
{{ at org.apache.kafka.clients.producer.internals.*TransactionalRequestResult.await*(TransactionalRequestResult.java:50)}}
{{ at org.apache.kafka.clients.producer.*KafkaProducer.initTransactions*(KafkaProducer.java:533)}}
{{ at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:143)}}
{{ at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)}}
{{ at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)}}
{{ at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)}}
{{ at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)}}
{{ at org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)}}
{{ at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)}}
{{ at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)}}
{{ at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)}}
{{ at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)}}
{{ at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)}}
{{ at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)}}
{{ at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)}}
{{ at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)}}
{{ at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)}}
{{ at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) }}
{quote}

> Kafka streams thread waits infinitely on transaction init
> ---------------------------------------------------------
>
>                 Key: KAFKA-7088
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7088
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 1.0.1
>         Environment: Linux 4.14.33-51.37.amzn1.x86_64 #1 SMP Thu May 3 20:07:43 UTC 2018 
> kafka-streams (client) 1.0.1
> kafka broker 1.1.0
> Java version:
> OpenJDK Runtime Environment (build 1.8.0_171-b10)
> OpenJDK 64-Bit Server VM (build 25.171-b10, mixed mode)
> kakfa config overrides:
> num.stream.threads: 6
> session.timeout.ms: 10000
> request.timeout.ms: 11000
> fetch.max.wait.ms: 500
> max.poll.records: 1000
> topic has 24 partitions
>            Reporter: Lukasz Gluchowski
>            Priority: Major
>              Labels: eos
>
> A kafka stream application thread stops processing without any feedback. The topic has 24 partitions and I noticed that processing stopped only for some partitions. I will describe what happened to partition:10. The application is still running (now for about 8 hours) and that thread is hanging there and no rebalancing that took place.
> There is no error (we have a custom `Thread.UncaughtExceptionHandler` which was not called). I noticed that after couple of minutes stream stopped processing (at offset 32606948 where log-end-offset is 33472402). 
> Broker itself is not reporting any active consumer in that consumer group and the only info I was able to gather was from thread dump:
> {code:java}
> "mp_ads_publisher_pro_madstorage-web-corotos-prod-9db804ae-2a7a-431f-be09-392ab38cd8a2-StreamThread-33" #113 prio=5 os_prio=0 tid=0x00007fe07c6349b0 nid=0xf7a waiting on condition [0x00007fe0215d4000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000000fec6a2f8> (a java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> at org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:50)
> at org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:554)
> at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:151)
> at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:404)
> at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:365)
> at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:350)
> at org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:137)
> at org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:88)
> at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:259)
> at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264)
> at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
> at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
> at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
> at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
> at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
> at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851)
> at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744){code}
>  
> I tried restarting application once but the situation repeated. Thread read some data, committed offset and stopped processing, leaving that thread in wait state.
> FYI: we have EOS enabled



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)