You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "junzhang (Jira)" <ji...@apache.org> on 2019/08/21 03:14:00 UTC

[jira] [Comment Edited] (SPARK-20780) Spark Kafka10 Consumer Hangs

    [ https://issues.apache.org/jira/browse/SPARK-20780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16911905#comment-16911905 ] 

junzhang edited comment on SPARK-20780 at 8/21/19 3:13 AM:
-----------------------------------------------------------

[~Kevin_HW] we face the same problem. Could you please point out the related jira which fix the bug in kafka 0.10.1.0?


was (Author: junzhang):
[~Kevin_HW] we face the same problem。 Could you please point out the related jira which fix the bug in kafka 0.10.1.0?

> Spark Kafka10 Consumer Hangs
> ----------------------------
>
>                 Key: SPARK-20780
>                 URL: https://issues.apache.org/jira/browse/SPARK-20780
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.1.0
>         Environment: Spark 2.1.0
> Spark Streaming Kafka 010
> Yarn - Cluster Mode
> CDH 5.8.4
> CentOS Linux release 7.2
>            Reporter: jayadeepj
>            Priority: Major
>         Attachments: streaming_1.png, streaming_2.png, tasks_timing_out_3.png
>
>
> We have recently upgraded our Streaming App with Direct Stream to Spark 2 (spark-streaming-kafka-0-10 - 2.1.0) with Kafka version (0.10.0.0) & Consumer 10 . We find abnormal delays after the application has run for a couple of hours or completed consumption of approx. ~ 5 million records.
> See screenshot 1 & 2
> There is a sudden dip in the processing time from ~15 seconds (usual for this app) to ~3 minutes & from then on the processing time keeps degrading throughout.
> We have seen that the delay is due to certain tasks taking the exact time duration of the configured Kafka Consumer 'request.timeout.ms' . We have tested this by varying timeout property to different values.
> See screenshot 3.
> I think the get(offset: Long, timeout: Long): ConsumerRecord[K, V] method  & subsequent poll(timeout) method in CachedKafkaConsumer.scala is actually timing out on some of the partitions without reading data. But the executor logs it as successfully completed after the exact timeout duration. Note that most other tasks are completing successfully with millisecond duration. The timeout is most likely from the org.apache.kafka.clients.consumer.KafkaConsumer & we did not observe any network latency difference.
> We have observed this across multiple clusters & multiple apps with & without TLS/SSL. Spark 1.6 with 0-8 consumer seems to be fine with consistent performance
> 17/05/17 10:30:06 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 446288
> 17/05/17 10:30:06 INFO executor.Executor: Running task 11.0 in stage 5663.0 (TID 446288)
> 17/05/17 10:30:06 INFO kafka010.KafkaRDD: Computing topic XX-XXX-XX, partition 0 offsets 776843 -> 779591
> 17/05/17 10:30:06 INFO kafka010.CachedKafkaConsumer: Initial fetch for spark-executor-default1 XX-XXX-XX 0 776843
> 17/05/17 10:30:56 INFO executor.Executor: Finished task 11.0 in stage 5663.0 (TID 446288). 1699 bytes result sent to driver
> 17/05/17 10:30:56 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 446329
> 17/05/17 10:30:56 INFO executor.Executor: Running task 0.0 in stage 5667.0 (TID 446329)
> 17/05/17 10:30:56 INFO spark.MapOutputTrackerWorker: Updating epoch to 3116 and clearing cache
> 17/05/17 10:30:56 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 6807
> 17/05/17 10:30:56 INFO memory.MemoryStore: Block broadcast_6807_piece0 stored as bytes in memory (estimated size 13.1 KB, free 4.1 GB)
> 17/05/17 10:30:56 INFO broadcast.TorrentBroadcast: Reading broadcast variable 6807 took 4 ms
> 17/05/17 10:30:56 INFO memory.MemoryStore: Block broadcast_6807 stored as values in m
> We can see that the log statement differ with the exact timeout duration.
> Our consumer config is below.
> 17/05/17 12:33:13 INFO dstream.ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@1171dde4
> 17/05/17 12:33:13 INFO consumer.ConsumerConfig: ConsumerConfig values:
> 	metric.reporters = []
> 	metadata.max.age.ms = 300000
> 	partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
> 	reconnect.backoff.ms = 50
> 	sasl.kerberos.ticket.renew.window.factor = 0.8
> 	max.partition.fetch.bytes = 1048576
> 	bootstrap.servers = [xxxxx.xxx.xxx:9092]
> 	ssl.keystore.type = JKS
> 	enable.auto.commit = true
> 	sasl.mechanism = GSSAPI
> 	interceptor.classes = null
> 	exclude.internal.topics = true
> 	ssl.truststore.password = null
> 	client.id =
> 	ssl.endpoint.identification.algorithm = null
> 	max.poll.records = 2147483647
> 	check.crcs = true
> 	request.timeout.ms = 50000
> 	heartbeat.interval.ms = 3000
> 	auto.commit.interval.ms = 5000
> 	receive.buffer.bytes = 65536
> 	ssl.truststore.type = JKS
> 	ssl.truststore.location = null
> 	ssl.keystore.password = null
> 	fetch.min.bytes = 1
> 	send.buffer.bytes = 131072
> 	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
> 	group.id = default1
> 	retry.backoff.ms = 100
> 	ssl.secure.random.implementation = null
> 	sasl.kerberos.kinit.cmd = /usr/bin/kinit
> 	sasl.kerberos.service.name = null
> 	sasl.kerberos.ticket.renew.jitter = 0.05
> 	ssl.trustmanager.algorithm = PKIX
> 	ssl.key.password = null
> 	fetch.max.wait.ms = 500
> 	sasl.kerberos.min.time.before.relogin = 60000
> 	connections.max.idle.ms = 540000
> 	session.timeout.ms = 30000
> 	metrics.num.samples = 2
> 	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
> 	ssl.protocol = TLS
> 	ssl.provider = null
> 	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> 	ssl.keystore.location = null
> 	ssl.cipher.suites = null
> 	security.protocol = PLAINTEXT
> 	ssl.keymanager.algorithm = SunX509
> 	metrics.sample.window.ms = 30000
> 	auto.offset.reset = latest



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org