You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Ruslan Taran (JIRA)" <ji...@apache.org> on 2019/05/23 11:41:00 UTC

[jira] [Created] (SPARK-27818) Spark Structured Streaming executors fails with OutOfMemoryError due to KafkaMbeans

Ruslan Taran created SPARK-27818:
------------------------------------

             Summary: Spark Structured Streaming executors fails with OutOfMemoryError due to KafkaMbeans
                 Key: SPARK-27818
                 URL: https://issues.apache.org/jira/browse/SPARK-27818
             Project: Spark
          Issue Type: Bug
          Components: Spark Core, Structured Streaming
    Affects Versions: 2.3.0
         Environment: HDP 2.6.5.0-292 with Spark 2.3.0.2.6.5.0-292 and Kafka 1.0.0.2.6.5.0-292.
            Reporter: Ruslan Taran


Checking the heap allocation with VirtualVM indicates that JMX Mbean Server memory usage grows linearly with time.

After a further investigation it seems that JMX Mbean Server is filled with thousands of instances of KafkaMbean objects with metrics for consumer-\d+ that goes into thousands (equal to the number of tasks created on the executor).
{code:java}
$KafkaMbean.objectName._canonicalName = kafka.consumer:client-id=consumer-\d+,type=consumer-metrics
{code}
 

Running Kafka consumer with DEBUG logs on the executor shows that the executor adds thousands of metrics sensors and often does not remove them at all or only removes some.

I would expect KafkaMbeans to be cleaned once the task has been completed.

 

Here is how I initialise structured streaming: 

 

{{sparkSession}}
{{  .readStream}}
{{  .format("kafka")}}
{{  .options(Map("kafka.bootstrap.servers" -> KAFKA_BROKERS,}}
{{               "subscribePattern" -> INPUT_TOPIC,}}
{{               "startingOffsets" -> "earliest",}}
{{               "failOnDataLoss" -> "false"))}}
{{  .mapPartitions(processData)}}
{{  .writeStream}}
{{  .format("kafka")}}
{{  .options(Map("kafka.bootstrap.servers" -> KAFKA_BROKERS,   }}
{{               "checkpointLocation" -> CHECKPOINT_LOCATION))}}
{{  .queryName("Process Data") .outputMode("update")}}
{{  .trigger(Trigger.ProcessingTime(1000))}}
{{  .load()}}
{{  .start()}}
{{  .awaitTermination()}}

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org