You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Apoorva Sareen <ap...@gmail.com> on 2015/07/13 20:05:53 UTC

Spark off heap memory leak on Yarn with Kafka direct stream

Hi,

I am running spark streaming 1.4.0 on Yarn (Apache distribution 2.6.0) with java 1.8.0_45 and also Kafka direct stream. I am also using spark with scala 2.11 support.

The issue I am seeing is that both driver and executor containers are gradually increasing the physical memory usage till a point where yarn container kill it. I have configured upto 192M Heap and 384 off heap space in my driver but it eventually runs out of it

The Heap memory appears to be fine with regular GC cycles. There is no OutOffMemory encountered ever in any such runs

Infact I am not generating any traffic on the kafka queues still this happens. Here is the code I am using

object SimpleSparkStreaming extends App {

val conf = new SparkConf()
val ssc = new StreamingContext(conf,Seconds(conf.getLong("spark.batch.window.size",1L)));
ssc.checkpoint("checkpoint")
val topics = Set(conf.get("spark.kafka.topic.name")); 
    val kafkaParams = Map[String, String]("metadata.broker.list" -> conf.get("spark.kafka.broker.list"))
            val kafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topics)
            kafkaStream.foreachRDD(rdd => {
                rdd.foreach(x => {
                    println(x._2)
                })

            })
    kafkaStream.print()
            ssc.start() 

            ssc.awaitTermination()

}
I am running this on CentOS 7. The command used for spark submit is following

./bin/spark-submit --class com.rasa.cloud.prototype.spark.SimpleSparkStreaming \
--conf spark.yarn.executor.memoryOverhead=256 \
--conf spark.yarn.driver.memoryOverhead=384 \
--conf spark.kafka.topic.name=test \
--conf spark.kafka.broker.list=172.31.45.218:9092 \
--conf spark.batch.window.size=1 \
--conf spark.app.name="Simple Spark Kafka application" \
--master yarn-cluster \
--num-executors 1 \
--driver-memory 192m \
--executor-memory 128m \
--executor-cores 1 \
/home/centos/spark-poc/target/lib/spark-streaming-prototype-0.0.1-SNAPSHOT.jar 
Any help is greatly appreciated

Regards,

Apoorva

Re: Spark off heap memory leak on Yarn with Kafka direct stream

Posted by Apoorva Sareen <ap...@gmail.com>.
It happens irrespective of whether there is traffic or no traffic on the kafka topic. Also, there is no clue i could see in the heap space. The heap looks healthy and stable. Its something off heap which is constantly growing. I also checked the JNI reference count from the dumps which appear stable (its constantly getting GCed) and tried to limit the size of meatspace and direct memory using following

--conf spark.driver.extraJavaOptions="-XX:MaxMetaspaceSize=128M -XX:MaxDirectMemorySize=128M" \

but with no success.  Thanks for offering help

Regards,
Apoorva

> On 14-Jul-2015, at 12:43 am, Cody Koeninger <co...@koeninger.org> wrote:
> 
> Does the issue only happen when you have no traffic on the topic?
> 
> Have you profiled to see what's using heap space?
> 
> 
> On Mon, Jul 13, 2015 at 1:05 PM, Apoorva Sareen <apoorva.sareen@gmail.com <ma...@gmail.com>> wrote:
> Hi,
> 
> I am running spark streaming 1.4.0 on Yarn (Apache distribution 2.6.0) with java 1.8.0_45 and also Kafka direct stream. I am also using spark with scala 2.11 support.
> 
> The issue I am seeing is that both driver and executor containers are gradually increasing the physical memory usage till a point where yarn container kill it. I have configured upto 192M Heap and 384 off heap space in my driver but it eventually runs out of it
> 
> The Heap memory appears to be fine with regular GC cycles. There is no OutOffMemory encountered ever in any such runs
> 
> Infact I am not generating any traffic on the kafka queues still this happens. Here is the code I am using
> 
> object SimpleSparkStreaming extends App {
> 
> val conf = new SparkConf()
> val ssc = new StreamingContext(conf,Seconds(conf.getLong("spark.batch.window.size",1L)));
> ssc.checkpoint("checkpoint")
> val topics = Set(conf.get("spark.kafka.topic.name <http://spark.kafka.topic.name/>")); 
>     val kafkaParams = Map[String, String]("metadata.broker.list" -> conf.get("spark.kafka.broker.list"))
>             val kafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topics)
>             kafkaStream.foreachRDD(rdd => {
>                 rdd.foreach(x => {
>                     println(x._2)
>                 })
> 
>             })
>     kafkaStream.print()
>             ssc.start() 
> 
>             ssc.awaitTermination()
> 
> }
> I am running this on CentOS 7. The command used for spark submit is following
> 
> ./bin/spark-submit --class com.rasa.cloud.prototype.spark.SimpleSparkStreaming \
> --conf spark.yarn.executor.memoryOverhead=256 \
> --conf spark.yarn.driver.memoryOverhead=384 \
> --conf spark.kafka.topic.name <http://spark.kafka.topic.name/>=test \
> --conf spark.kafka.broker.list=172.31.45.218:9092 <http://172.31.45.218:9092/> \
> --conf spark.batch.window.size=1 \
> --conf spark.app.name <http://spark.app.name/>="Simple Spark Kafka application" \
> --master yarn-cluster \
> --num-executors 1 \
> --driver-memory 192m \
> --executor-memory 128m \
> --executor-cores 1 \
> /home/centos/spark-poc/target/lib/spark-streaming-prototype-0.0.1-SNAPSHOT.jar 
> Any help is greatly appreciated
> 
> Regards,
> 
> Apoorva
> 
> 


Re: Spark off heap memory leak on Yarn with Kafka direct stream

Posted by Cody Koeninger <co...@koeninger.org>.
Does the issue only happen when you have no traffic on the topic?

Have you profiled to see what's using heap space?


On Mon, Jul 13, 2015 at 1:05 PM, Apoorva Sareen <ap...@gmail.com>
wrote:

> Hi,
>
> I am running spark streaming 1.4.0 on Yarn (Apache distribution 2.6.0)
> with java 1.8.0_45 and also Kafka direct stream. I am also using spark with
> scala 2.11 support.
>
> The issue I am seeing is that both driver and executor containers are
> gradually increasing the physical memory usage till a point where yarn
> container kill it. I have configured upto 192M Heap and 384 off heap space
> in my driver but it eventually runs out of it
>
> The Heap memory appears to be fine with regular GC cycles. There is no
> OutOffMemory encountered ever in any such runs
>
> Infact I am not generating any traffic on the kafka queues still this
> happens. Here is the code I am using
>
> object SimpleSparkStreaming extends App {
>
> val conf = new SparkConf()
> val ssc = new StreamingContext(conf,Seconds(conf.getLong("spark.batch.window.size",1L)));
> ssc.checkpoint("checkpoint")
> val topics = Set(conf.get("spark.kafka.topic.name"));
>     val kafkaParams = Map[String, String]("metadata.broker.list" -> conf.get("spark.kafka.broker.list"))
>             val kafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topics)
>             kafkaStream.foreachRDD(rdd => {
>                 rdd.foreach(x => {
>                     println(x._2)
>                 })
>
>             })
>     kafkaStream.print()
>             ssc.start()
>
>             ssc.awaitTermination()
>
> }
>
> I am running this on CentOS 7. The command used for spark submit is
> following
>
> ./bin/spark-submit --class com.rasa.cloud.prototype.spark.SimpleSparkStreaming \
> --conf spark.yarn.executor.memoryOverhead=256 \
> --conf spark.yarn.driver.memoryOverhead=384 \
> --conf spark.kafka.topic.name=test \
> --conf spark.kafka.broker.list=172.31.45.218:9092 \
> --conf spark.batch.window.size=1 \
> --conf spark.app.name="Simple Spark Kafka application" \
> --master yarn-cluster \
> --num-executors 1 \
> --driver-memory 192m \
> --executor-memory 128m \
> --executor-cores 1 \
> /home/centos/spark-poc/target/lib/spark-streaming-prototype-0.0.1-SNAPSHOT.jar
>
> Any help is greatly appreciated
>
> Regards,
>
> Apoorva
>