You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by saurabh mimani <mi...@gmail.com> on 2019/05/09 19:15:51 UTC

Kafka spout gets stuck in one partition

We have topology which has Kafka spout and few bolts. Currently the topic
it consume from, has two partitions.

I see after 2 days of running fine, It starts reading very slowly from one
partition which started to cause lag in one of the partition while other
partition is working fine.

We are using storm-kafka-client from this commit;
https://github.com/apache/storm/commit/6ba98b2
Kafka Spouts Lag
IdTopicPartitionLatest OffsetSpout Committed OffsetLag
MyKafkaSpout topic 0 45215277 45192386 22891
MyKafkaSpout topic
1 45228924 45228917 7

Executors for spout (All time)
Search:
IdUptimeHostPortActionsEmittedTransferredComplete latency (ms)AckedFailed
[33-33] 2d 3h 29m 27s box1 6700  files 1683270 1683270 65.590 1683268 1
[34-34] 2d 3h 29m 27s box2
6700  files 1710466 1710466 65.576 1710465 1
[35-35] 2d 3h 29m 27s box1
6700  files 0 0 0.000 0 0










Logs from box 1:(with org.apache.storm.kafka.spout)


2019-05-09T23:56:58.600+05:30 KafkaSpoutRetryExponentialBackoff [DEBUG]
Topic partitions with entries ready to be retried [{}]

2019-05-09T23:56:58.601+05:30 KafkaSpoutRetryExponentialBackoff [DEBUG]
Topic partitions with entries ready to be retried [{}]

2019-05-09T23:56:58.602+05:30 KafkaSpoutRetryExponentialBackoff [DEBUG]
Topic partitions with entries ready to be retried [{}]

2019-05-09T23:56:58.603+05:30 KafkaSpoutRetryExponentialBackoff [DEBUG]
Topic partitions with entries ready to be retried [{}]

2019-05-09T23:56:58.605+05:30 KafkaSpoutRetryExponentialBackoff [DEBUG]
Topic partitions with entries ready to be retried [{}]

2019-05-09T23:56:58.606+05:30 KafkaSpoutRetryExponentialBackoff [DEBUG]
Topic partitions with entries ready to be retried [{}]

2019-05-09T23:56:58.607+05:30 KafkaSpoutRetryExponentialBackoff [DEBUG]
Topic partitions with entries ready to be retried [{}]

2019-05-09T23:56:58.608+05:30 KafkaSpoutRetryExponentialBackoff [DEBUG]
Topic partitions with entries ready to be retried [{}]

2019-05-09T23:56:58.609+05:30 KafkaSpoutRetryExponentialBackoff [DEBUG]
Topic partitions with entries ready to be retried [{}]

2019-05-09T23:56:58.610+05:30 KafkaSpoutRetryExponentialBackoff [DEBUG]
Topic partitions with entries ready to be retried [{}]

2019-05-09T23:56:58.612+05:30 KafkaSpoutRetryExponentialBackoff [DEBUG]
Topic partitions with entries ready to be retried [{}]

2019-05-09T23:56:58.613+05:30 KafkaSpoutRetryExponentialBackoff [DEBUG]
Topic partitions with entries ready to be retried [{}]

2019-05-09T23:56:58.614+05:30 KafkaSpoutRetryExponentialBackoff [DEBUG]
Topic partitions with entries ready to be retried [{}]

2019-05-09T23:56:58.615+05:30 KafkaSpoutRetryExponentialBackoff [DEBUG]
Topic partitions with entries ready to be retried [{}]


Along with above I also see lot of following, :


2019-05-10T00:13:50.176+05:30 KafkaSpout [TRACE] No offsets to commit.
KafkaSpout{offsetManagers ={}, emitted=[]} .  <<-- *Notice not offset
manager, which on box2 there is more detail in this log*


*I see very few:  KafkaSpout [TRACE] Emitted tuple [[topic, 0 log compare
to other box in this one, which probably means it is emitting less compare
to other box.*



Also there are few:


019-05-10T00:13:50.673+05:30 KafkaSpout [DEBUG] Offsets successfully
committed to Kafka [{topic-0=OffsetAndMetadata{offset=45191816,
metadata='{"topologyId":"topic-273-1557241744","taskId":33,"threadName":"Thread-32-MyKafkaSpout-executor[33
33]"}'}}]

2019-05-10T00:13:50.673+05:30 KafkaSpoutRetryExponentialBackoff [DEBUG]
Topic partitions with entries ready to be retried [{}]

2019-05-10T00:13:50.673+05:30 OffsetManager [TRACE]
OffsetManager{topic-partition=topic-0, committedOffset=45191816,
emittedOffsets=[45191816], ackedMsgs=[], latestEmittedOffset=45191816}

2019-05-10T00:13:50.674+05:30 OffsetManager [DEBUG] Committed [1] offsets
in the range [45191815-45191815] for topic-partition [topic-0]. Processing
will resume at [45191816] upon spout restart

I


Logs from box 2:

2019-05-09T23:57:26.649+05:30 KafkaSpout [DEBUG] Not polling. Tuples
waiting to be emitted.

2019-05-09T23:57:26.650+05:30 KafkaSpout [DEBUG] Not polling. Tuples
waiting to be emitted.

2019-05-09T23:57:26.650+05:30 KafkaSpout [DEBUG] Not polling. Tuples
waiting to be emitted.

2019-05-09T23:57:26.650+05:30 KafkaSpout [DEBUG] Not polling. Tuples
waiting to be emitted.

2019-05-09T23:57:26.650+05:30 KafkaSpout [DEBUG] Not polling. Tuples
waiting to be emitted.

2019-05-09T23:57:26.650+05:30 KafkaSpout [DEBUG] Not polling. Tuples
waiting to be emitted.

2019-05-09T23:57:26.650+05:30 KafkaSpout [DEBUG] Not polling. Tuples
waiting to be emitted.

2019-05-09T23:57:26.650+05:30 KafkaSpout [DEBUG] Not polling. Tuples
waiting to be emitted.

2019-05-09T23:57:26.650+05:30 KafkaSpout [DEBUG] Not polling. Tuples
waiting to be emitted.

2019-05-09T23:57:26.650+05:30 KafkaSpout [DEBUG] Not polling. Tuples
waiting to be emitted.

2019-05-09T23:57:26.650+05:30 KafkaSpout [DEBUG] Not polling. Tuples
waiting to be emitted.

*I also see "KafkaSpout [DEBUG] Polled [6] records from Kafka" on this box
which are not there in other box.*

few other logs:

2019-05-10T00:13:15.888+05:30 OffsetManager [DEBUG] Topic-partition
[topic-1] has no offsets ready to be committed

2019-05-10T00:13:15.888+05:30 OffsetManager [TRACE]
OffsetManager{topic-partition=topic-1, committedOffset=45228145,
emittedOffsets=[45228145], ackedMsgs=[], latestEmittedOffset=45228145}

2019-05-10T00:13:15.888+05:30 KafkaSpout [TRACE] No offsets to commit.
KafkaSpout{offsetManagers ={topic-1=OffsetManager{topic-partition=topic-1,
committedOffset=45228145, emittedOffsets=[45228145], ackedMsgs=[],
latestEmittedOffset=45228145}}, emitted=[{topic-partition=topic-1,
offset=45228145, numFails=0, nullTuple=false}]} . * <<-- details present
which were not there in other box*

2019-05-10T00:13:15.888+05:30 KafkaSpout [DEBUG] Not polling. Tuples
waiting to be emitted.

Following is how my topology looks like:

Spouts (All time)
Search:
IdExecutorsTasksEmittedTransferredComplete latency
(ms)AckedFailedError HostError
PortLast errorError Time
KafkaSpout 3 3 3391374 3391374 65.551 3391341 2
<http://10.33.150.189:8080/topology.html?id=Sms_sms_cp_usercluster_all-273-1557241744>
Showing 1 to 1 of 1 entries
Bolts (All time)
Search:
IdExecutorsTasksEmittedTransferredCapacity (last 10m)Execute latency (ms)
ExecutedProcess latency (ms)AckedFailedError HostError PortLast errorError
Time
firstBolt 2 2 3391372 3391372 0.002 0.253 3391372 0.251 3391372 0
<http://10.33.150.189:8080/topology.html?id=Sms_sms_cp_usercluster_all-273-1557241744>
thirdBolt 30 30 0 0 0.046 52.867 3391342 52.864 3391342 0
<http://10.33.150.189:8080/topology.html?id=Sms_sms_cp_usercluster_all-273-1557241744>
secondBolt 15 15 3391373 3391373 0.000 0.170 3391373 0.168 3391373 0

Re: Kafka spout gets stuck in one partition

Posted by Stig Rohde Døssing <st...@gmail.com>.
From your image, you have 3 spout tasks, but only 2 Kafka partitions. This
should mean that one of your executors isn't doing anything. I think this
is why you're seeing this log line where offsetManagers is empty.

2019-05-10T00:13:50.176+05:30 KafkaSpout [TRACE] No offsets to commit.
KafkaSpout{offsetManagers ={}, emitted=[]}

You should set your task count to be equal to or less than your number of
Kafka partitions.

Regarding the lag, it is hard to tell from what you've posted whether the
issue is in the spout or your topology. Unfortunately your log lines don't
contain the logging thread name, so it's hard to distinguish logs from the
do-nothing executor from the logs for the executor for topic-0. If you fix
the task count issue, it will be easier to tell from the log what the
topic-0 executor is doing.

Den tor. 9. maj 2019 kl. 21.16 skrev saurabh mimani <
mimani.saurabh@gmail.com>:

> We have topology which has Kafka spout and few bolts. Currently the topic
> it consume from, has two partitions.
>
> I see after 2 days of running fine, It starts reading very slowly from one
> partition which started to cause lag in one of the partition while other
> partition is working fine.
>
> We are using storm-kafka-client from this commit;
> https://github.com/apache/storm/commit/6ba98b2
> Kafka Spouts Lag
> IdTopicPartitionLatest OffsetSpout Committed OffsetLag
> MyKafkaSpout topic 0 45215277 45192386 22891
> MyKafkaSpout topic
> 1 45228924 45228917 7
>
> Executors for spout (All time)
> Search:
> IdUptimeHostPortActionsEmittedTransferredComplete latency (ms)AckedFailed
> [33-33] 2d 3h 29m 27s box1 6700  files 1683270 1683270 65.590 1683268 1
> [34-34] 2d 3h 29m 27s box2
> 6700  files 1710466 1710466 65.576 1710465 1
> [35-35] 2d 3h 29m 27s box1
> 6700  files 0 0 0.000 0 0
>
>
>
>
>
>
>
>
>
>
> Logs from box 1:(with org.apache.storm.kafka.spout)
>
>
> 2019-05-09T23:56:58.600+05:30 KafkaSpoutRetryExponentialBackoff [DEBUG]
> Topic partitions with entries ready to be retried [{}]
>
> 2019-05-09T23:56:58.601+05:30 KafkaSpoutRetryExponentialBackoff [DEBUG]
> Topic partitions with entries ready to be retried [{}]
>
> 2019-05-09T23:56:58.602+05:30 KafkaSpoutRetryExponentialBackoff [DEBUG]
> Topic partitions with entries ready to be retried [{}]
>
> 2019-05-09T23:56:58.603+05:30 KafkaSpoutRetryExponentialBackoff [DEBUG]
> Topic partitions with entries ready to be retried [{}]
>
> 2019-05-09T23:56:58.605+05:30 KafkaSpoutRetryExponentialBackoff [DEBUG]
> Topic partitions with entries ready to be retried [{}]
>
> 2019-05-09T23:56:58.606+05:30 KafkaSpoutRetryExponentialBackoff [DEBUG]
> Topic partitions with entries ready to be retried [{}]
>
> 2019-05-09T23:56:58.607+05:30 KafkaSpoutRetryExponentialBackoff [DEBUG]
> Topic partitions with entries ready to be retried [{}]
>
> 2019-05-09T23:56:58.608+05:30 KafkaSpoutRetryExponentialBackoff [DEBUG]
> Topic partitions with entries ready to be retried [{}]
>
> 2019-05-09T23:56:58.609+05:30 KafkaSpoutRetryExponentialBackoff [DEBUG]
> Topic partitions with entries ready to be retried [{}]
>
> 2019-05-09T23:56:58.610+05:30 KafkaSpoutRetryExponentialBackoff [DEBUG]
> Topic partitions with entries ready to be retried [{}]
>
> 2019-05-09T23:56:58.612+05:30 KafkaSpoutRetryExponentialBackoff [DEBUG]
> Topic partitions with entries ready to be retried [{}]
>
> 2019-05-09T23:56:58.613+05:30 KafkaSpoutRetryExponentialBackoff [DEBUG]
> Topic partitions with entries ready to be retried [{}]
>
> 2019-05-09T23:56:58.614+05:30 KafkaSpoutRetryExponentialBackoff [DEBUG]
> Topic partitions with entries ready to be retried [{}]
>
> 2019-05-09T23:56:58.615+05:30 KafkaSpoutRetryExponentialBackoff [DEBUG]
> Topic partitions with entries ready to be retried [{}]
>
>
> Along with above I also see lot of following, :
>
>
> 2019-05-10T00:13:50.176+05:30 KafkaSpout [TRACE] No offsets to commit.
> KafkaSpout{offsetManagers ={}, emitted=[]} .  <<-- *Notice not offset
> manager, which on box2 there is more detail in this log*
>
>
> *I see very few:  KafkaSpout [TRACE] Emitted tuple [[topic, 0 log compare
> to other box in this one, which probably means it is emitting less compare
> to other box.*
>
>
>
> Also there are few:
>
>
> 019-05-10T00:13:50.673+05:30 KafkaSpout [DEBUG] Offsets successfully
> committed to Kafka [{topic-0=OffsetAndMetadata{offset=45191816,
> metadata='{"topologyId":"topic-273-1557241744","taskId":33,"threadName":"Thread-32-MyKafkaSpout-executor[33
> 33]"}'}}]
>
> 2019-05-10T00:13:50.673+05:30 KafkaSpoutRetryExponentialBackoff [DEBUG]
> Topic partitions with entries ready to be retried [{}]
>
> 2019-05-10T00:13:50.673+05:30 OffsetManager [TRACE]
> OffsetManager{topic-partition=topic-0, committedOffset=45191816,
> emittedOffsets=[45191816], ackedMsgs=[], latestEmittedOffset=45191816}
>
> 2019-05-10T00:13:50.674+05:30 OffsetManager [DEBUG] Committed [1] offsets
> in the range [45191815-45191815] for topic-partition [topic-0]. Processing
> will resume at [45191816] upon spout restart
>
> I
>
>
> Logs from box 2:
>
> 2019-05-09T23:57:26.649+05:30 KafkaSpout [DEBUG] Not polling. Tuples
> waiting to be emitted.
>
> 2019-05-09T23:57:26.650+05:30 KafkaSpout [DEBUG] Not polling. Tuples
> waiting to be emitted.
>
> 2019-05-09T23:57:26.650+05:30 KafkaSpout [DEBUG] Not polling. Tuples
> waiting to be emitted.
>
> 2019-05-09T23:57:26.650+05:30 KafkaSpout [DEBUG] Not polling. Tuples
> waiting to be emitted.
>
> 2019-05-09T23:57:26.650+05:30 KafkaSpout [DEBUG] Not polling. Tuples
> waiting to be emitted.
>
> 2019-05-09T23:57:26.650+05:30 KafkaSpout [DEBUG] Not polling. Tuples
> waiting to be emitted.
>
> 2019-05-09T23:57:26.650+05:30 KafkaSpout [DEBUG] Not polling. Tuples
> waiting to be emitted.
>
> 2019-05-09T23:57:26.650+05:30 KafkaSpout [DEBUG] Not polling. Tuples
> waiting to be emitted.
>
> 2019-05-09T23:57:26.650+05:30 KafkaSpout [DEBUG] Not polling. Tuples
> waiting to be emitted.
>
> 2019-05-09T23:57:26.650+05:30 KafkaSpout [DEBUG] Not polling. Tuples
> waiting to be emitted.
>
> 2019-05-09T23:57:26.650+05:30 KafkaSpout [DEBUG] Not polling. Tuples
> waiting to be emitted.
>
> *I also see "KafkaSpout [DEBUG] Polled [6] records from Kafka" on this box
> which are not there in other box.*
>
> few other logs:
>
> 2019-05-10T00:13:15.888+05:30 OffsetManager [DEBUG] Topic-partition
> [topic-1] has no offsets ready to be committed
>
> 2019-05-10T00:13:15.888+05:30 OffsetManager [TRACE]
> OffsetManager{topic-partition=topic-1, committedOffset=45228145,
> emittedOffsets=[45228145], ackedMsgs=[], latestEmittedOffset=45228145}
>
> 2019-05-10T00:13:15.888+05:30 KafkaSpout [TRACE] No offsets to commit.
> KafkaSpout{offsetManagers ={topic-1=OffsetManager{topic-partition=topic-1,
> committedOffset=45228145, emittedOffsets=[45228145], ackedMsgs=[],
> latestEmittedOffset=45228145}}, emitted=[{topic-partition=topic-1,
> offset=45228145, numFails=0, nullTuple=false}]} . * <<-- details present
> which were not there in other box*
>
> 2019-05-10T00:13:15.888+05:30 KafkaSpout [DEBUG] Not polling. Tuples
> waiting to be emitted.
>
> Following is how my topology looks like:
>
> Spouts (All time)
> Search:
> IdExecutorsTasksEmittedTransferredComplete latency (ms)AckedFailedError
> HostError PortLast errorError Time
> KafkaSpout 3 3 3391374 3391374 65.551 3391341 2
> <http://10.33.150.189:8080/topology.html?id=Sms_sms_cp_usercluster_all-273-1557241744>
> Showing 1 to 1 of 1 entries
> Bolts (All time)
> Search:
> IdExecutorsTasksEmittedTransferredCapacity (last 10m)Execute latency (ms)
> ExecutedProcess latency (ms)AckedFailedError HostError PortLast errorError
> Time
> firstBolt 2 2 3391372 3391372 0.002 0.253 3391372 0.251 3391372 0
> <http://10.33.150.189:8080/topology.html?id=Sms_sms_cp_usercluster_all-273-1557241744>
> thirdBolt 30 30 0 0 0.046 52.867 3391342 52.864 3391342 0
> <http://10.33.150.189:8080/topology.html?id=Sms_sms_cp_usercluster_all-273-1557241744>
> secondBolt 15 15 3391373 3391373 0.000 0.170 3391373 0.168 3391373 0
>
>
>
>
>
>