You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Ruslan Taran (JIRA)" <ji...@apache.org> on 2019/05/23 11:42:00 UTC

[jira] [Updated] (SPARK-27818) Spark Structured Streaming executors fails with OutOfMemoryError due to KafkaMbeans

     [ https://issues.apache.org/jira/browse/SPARK-27818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ruslan Taran updated SPARK-27818:
---------------------------------
    Description: 
Checking the heap allocation with VirtualVM indicates that JMX Mbean Server memory usage grows linearly with time.

After a further investigation it seems that JMX Mbean Server is filled with thousands of instances of KafkaMbean objects with metrics for consumer-\d+ that goes into thousands (equal to the number of tasks created on the executor).
{code:java}
$KafkaMbean.objectName._canonicalName = kafka.consumer:client-id=consumer-\d+,type=consumer-metrics
{code}
 

Running Kafka consumer with DEBUG logs on the executor shows that the executor adds thousands of metrics sensors and often does not remove them at all or only removes some.

I would expect KafkaMbeans to be cleaned once the task has been completed.

 

Here is how I initialise structured streaming: 

 

{code:java}
sparkSession
 .readStream
 .format("kafka")
 .options(Map("kafka.bootstrap.servers" -> KAFKA_BROKERS,
 "subscribePattern" -> INPUT_TOPIC,
 "startingOffsets" -> "earliest",
 "failOnDataLoss" -> "false"))
 .mapPartitions(processData)
 .writeStream
 .format("kafka")
 .options(Map("kafka.bootstrap.servers" -> KAFKA_BROKERS, 
 "checkpointLocation" -> CHECKPOINT_LOCATION))
 .queryName("Process Data") .outputMode("update")
 .trigger(Trigger.ProcessingTime(1000))
 .load()
 .start()
 .awaitTermination()
{code}
 

  was:
Checking the heap allocation with VirtualVM indicates that JMX Mbean Server memory usage grows linearly with time.

After a further investigation it seems that JMX Mbean Server is filled with thousands of instances of KafkaMbean objects with metrics for consumer-\d+ that goes into thousands (equal to the number of tasks created on the executor).
{code:java}
$KafkaMbean.objectName._canonicalName = kafka.consumer:client-id=consumer-\d+,type=consumer-metrics
{code}
 

Running Kafka consumer with DEBUG logs on the executor shows that the executor adds thousands of metrics sensors and often does not remove them at all or only removes some.

I would expect KafkaMbeans to be cleaned once the task has been completed.

 

Here is how I initialise structured streaming: 

 

{{sparkSession}}
{{  .readStream}}
{{  .format("kafka")}}
{{  .options(Map("kafka.bootstrap.servers" -> KAFKA_BROKERS,}}
{{               "subscribePattern" -> INPUT_TOPIC,}}
{{               "startingOffsets" -> "earliest",}}
{{               "failOnDataLoss" -> "false"))}}
{{  .mapPartitions(processData)}}
{{  .writeStream}}
{{  .format("kafka")}}
{{  .options(Map("kafka.bootstrap.servers" -> KAFKA_BROKERS,   }}
{{               "checkpointLocation" -> CHECKPOINT_LOCATION))}}
{{  .queryName("Process Data") .outputMode("update")}}
{{  .trigger(Trigger.ProcessingTime(1000))}}
{{  .load()}}
{{  .start()}}
{{  .awaitTermination()}}

 

 


> Spark Structured Streaming executors fails with OutOfMemoryError due to KafkaMbeans
> -----------------------------------------------------------------------------------
>
>                 Key: SPARK-27818
>                 URL: https://issues.apache.org/jira/browse/SPARK-27818
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, Structured Streaming
>    Affects Versions: 2.3.0
>         Environment: HDP 2.6.5.0-292 with Spark 2.3.0.2.6.5.0-292 and Kafka 1.0.0.2.6.5.0-292.
>            Reporter: Ruslan Taran
>            Priority: Major
>
> Checking the heap allocation with VirtualVM indicates that JMX Mbean Server memory usage grows linearly with time.
> After a further investigation it seems that JMX Mbean Server is filled with thousands of instances of KafkaMbean objects with metrics for consumer-\d+ that goes into thousands (equal to the number of tasks created on the executor).
> {code:java}
> $KafkaMbean.objectName._canonicalName = kafka.consumer:client-id=consumer-\d+,type=consumer-metrics
> {code}
>  
> Running Kafka consumer with DEBUG logs on the executor shows that the executor adds thousands of metrics sensors and often does not remove them at all or only removes some.
> I would expect KafkaMbeans to be cleaned once the task has been completed.
>  
> Here is how I initialise structured streaming: 
>  
> {code:java}
> sparkSession
>  .readStream
>  .format("kafka")
>  .options(Map("kafka.bootstrap.servers" -> KAFKA_BROKERS,
>  "subscribePattern" -> INPUT_TOPIC,
>  "startingOffsets" -> "earliest",
>  "failOnDataLoss" -> "false"))
>  .mapPartitions(processData)
>  .writeStream
>  .format("kafka")
>  .options(Map("kafka.bootstrap.servers" -> KAFKA_BROKERS, 
>  "checkpointLocation" -> CHECKPOINT_LOCATION))
>  .queryName("Process Data") .outputMode("update")
>  .trigger(Trigger.ProcessingTime(1000))
>  .load()
>  .start()
>  .awaitTermination()
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org