You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Ruslan Taran (JIRA)" <ji...@apache.org> on 2019/05/23 11:42:00 UTC
[jira] [Updated] (SPARK-27818) Spark Structured Streaming executors
fails with OutOfMemoryError due to KafkaMbeans
[ https://issues.apache.org/jira/browse/SPARK-27818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ruslan Taran updated SPARK-27818:
---------------------------------
Description:
Checking the heap allocation with VirtualVM indicates that JMX Mbean Server memory usage grows linearly with time.
After a further investigation it seems that JMX Mbean Server is filled with thousands of instances of KafkaMbean objects with metrics for consumer-\d+ that goes into thousands (equal to the number of tasks created on the executor).
{code:java}
$KafkaMbean.objectName._canonicalName = kafka.consumer:client-id=consumer-\d+,type=consumer-metrics
{code}
Running Kafka consumer with DEBUG logs on the executor shows that the executor adds thousands of metrics sensors and often does not remove them at all or only removes some.
I would expect KafkaMbeans to be cleaned once the task has been completed.
Here is how I initialise structured streaming:
{code:java}
sparkSession
.readStream
.format("kafka")
.options(Map("kafka.bootstrap.servers" -> KAFKA_BROKERS,
"subscribePattern" -> INPUT_TOPIC,
"startingOffsets" -> "earliest",
"failOnDataLoss" -> "false"))
.mapPartitions(processData)
.writeStream
.format("kafka")
.options(Map("kafka.bootstrap.servers" -> KAFKA_BROKERS,
"checkpointLocation" -> CHECKPOINT_LOCATION))
.queryName("Process Data") .outputMode("update")
.trigger(Trigger.ProcessingTime(1000))
.load()
.start()
.awaitTermination()
{code}
was:
Checking the heap allocation with VirtualVM indicates that JMX Mbean Server memory usage grows linearly with time.
After a further investigation it seems that JMX Mbean Server is filled with thousands of instances of KafkaMbean objects with metrics for consumer-\d+ that goes into thousands (equal to the number of tasks created on the executor).
{code:java}
$KafkaMbean.objectName._canonicalName = kafka.consumer:client-id=consumer-\d+,type=consumer-metrics
{code}
Running Kafka consumer with DEBUG logs on the executor shows that the executor adds thousands of metrics sensors and often does not remove them at all or only removes some.
I would expect KafkaMbeans to be cleaned once the task has been completed.
Here is how I initialise structured streaming:
{{sparkSession}}
{{ .readStream}}
{{ .format("kafka")}}
{{ .options(Map("kafka.bootstrap.servers" -> KAFKA_BROKERS,}}
{{ "subscribePattern" -> INPUT_TOPIC,}}
{{ "startingOffsets" -> "earliest",}}
{{ "failOnDataLoss" -> "false"))}}
{{ .mapPartitions(processData)}}
{{ .writeStream}}
{{ .format("kafka")}}
{{ .options(Map("kafka.bootstrap.servers" -> KAFKA_BROKERS, }}
{{ "checkpointLocation" -> CHECKPOINT_LOCATION))}}
{{ .queryName("Process Data") .outputMode("update")}}
{{ .trigger(Trigger.ProcessingTime(1000))}}
{{ .load()}}
{{ .start()}}
{{ .awaitTermination()}}
> Spark Structured Streaming executors fails with OutOfMemoryError due to KafkaMbeans
> -----------------------------------------------------------------------------------
>
> Key: SPARK-27818
> URL: https://issues.apache.org/jira/browse/SPARK-27818
> Project: Spark
> Issue Type: Bug
> Components: Spark Core, Structured Streaming
> Affects Versions: 2.3.0
> Environment: HDP 2.6.5.0-292 with Spark 2.3.0.2.6.5.0-292 and Kafka 1.0.0.2.6.5.0-292.
> Reporter: Ruslan Taran
> Priority: Major
>
> Checking the heap allocation with VirtualVM indicates that JMX Mbean Server memory usage grows linearly with time.
> After a further investigation it seems that JMX Mbean Server is filled with thousands of instances of KafkaMbean objects with metrics for consumer-\d+ that goes into thousands (equal to the number of tasks created on the executor).
> {code:java}
> $KafkaMbean.objectName._canonicalName = kafka.consumer:client-id=consumer-\d+,type=consumer-metrics
> {code}
>
> Running Kafka consumer with DEBUG logs on the executor shows that the executor adds thousands of metrics sensors and often does not remove them at all or only removes some.
> I would expect KafkaMbeans to be cleaned once the task has been completed.
>
> Here is how I initialise structured streaming:
>
> {code:java}
> sparkSession
> .readStream
> .format("kafka")
> .options(Map("kafka.bootstrap.servers" -> KAFKA_BROKERS,
> "subscribePattern" -> INPUT_TOPIC,
> "startingOffsets" -> "earliest",
> "failOnDataLoss" -> "false"))
> .mapPartitions(processData)
> .writeStream
> .format("kafka")
> .options(Map("kafka.bootstrap.servers" -> KAFKA_BROKERS,
> "checkpointLocation" -> CHECKPOINT_LOCATION))
> .queryName("Process Data") .outputMode("update")
> .trigger(Trigger.ProcessingTime(1000))
> .load()
> .start()
> .awaitTermination()
> {code}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org