You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by darin <li...@foxmail.com> on 2017/03/17 01:59:53 UTC

spark streaming exectors memory increasing and executor killed by yarn

Hi,
I got this exception when streaming program run some hours.

```
*User class threw exception: org.apache.spark.SparkException: Job aborted
due to stage failure: Task 21 in stage 1194.0 failed 4 times, most recent
failure: Lost task 21.3 in stage 1194.0 (TID 2475, 2.dev3, executor 66):
ExecutorLostFailure (executor 66 exited caused by one of the running tasks)
Reason: Container killed by YARN for exceeding memory limits. 3.5 GB of 3.5
GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead.*
```

I have googled some solutions like close yarn memory monitor ,increasing
exector memory... .I think it is not the right way .


And this is the submit script:
```
*spark-submit --master yarn-cluster --driver-cores 1 --driver-memory 1G
--num-executors 6 --executor-cores 3 --executor-memory 3G --conf
"spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC -XX:+UseParNewGC
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/javadump.hprof" --conf
"spark.kryoserializer.buffer.max=512m" --class
com.dtise.data.streaming.ad.DTStreamingStatistics
hdfs://nameservice1/user/yanghb/spark-streaming-1.0.jar*
```

And This is the main codes:

```
val originalStream = ssc.textFileStream(rawDataPath)
    originalStream.repartition(10).mapPartitions(parseAdLog).reduceByKey(_
++ _)
      .mapWithState(StateSpec.function(countAdLogWithState
_)).foreachRDD(rdd => {
        if (!rdd.isEmpty()) {
          val batchTime = Calendar.getInstance.getTimeInMillis
          val dimensionSumMap = rdd.map(_._1).reduce(_ ++ _)
          val nameList = rdd.map(_._2).reduce(_ ++ _).toList
          val jedis = RedisUtils.jedis()
          jedis.hmset(joinString("t_ad_dimension_sum", batchTime),
dimensionSumMap)
          jedis.lpush(joinString("t_ad_name", batchTime), nameList: _*)
          jedis.set(joinString("t_ad", batchTime.toString), "OK")
          jedis.close()

          rdd.flatMap(_._3).foreachPartition(logInfoList => {
            val producter = new StringProducter
            for (logInfo <- logInfoList) {
              val logInfoArr = logInfo.split("\t", -1)
              val kafkaKey = "ad/" + logInfoArr(campaignIdIdx) + "/" +
logInfoArr(logDateIdx)
              producter.send("cookedLog", kafkaKey, logInfo)
            }
            producter.close()
          })
        }
      })
```

These are jvm heap mat results 

<http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095238%402x.png> 
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095254%402x.png> 
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095331%402x.png> 

/*Anybody has any advice about this ?
Thanks*/





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-exectors-memory-increasing-and-executor-killed-by-yarn-tp28500.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: spark streaming exectors memory increasing and executor killed by yarn

Posted by Yong Zhang <ja...@hotmail.com>.
In this kind of question, you always want to tell us the spark version.


Yong


________________________________
From: darin <li...@foxmail.com>
Sent: Thursday, March 16, 2017 9:59 PM
To: user@spark.apache.org
Subject: spark streaming exectors memory increasing and executor killed by yarn

Hi,
I got this exception when streaming program run some hours.

```
*User class threw exception: org.apache.spark.SparkException: Job aborted
due to stage failure: Task 21 in stage 1194.0 failed 4 times, most recent
failure: Lost task 21.3 in stage 1194.0 (TID 2475, 2.dev3, executor 66):
ExecutorLostFailure (executor 66 exited caused by one of the running tasks)
Reason: Container killed by YARN for exceeding memory limits. 3.5 GB of 3.5
GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead.*
```

I have googled some solutions like close yarn memory monitor ,increasing
exector memory... .I think it is not the right way .


And this is the submit script:
```
*spark-submit --master yarn-cluster --driver-cores 1 --driver-memory 1G
--num-executors 6 --executor-cores 3 --executor-memory 3G --conf
"spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC -XX:+UseParNewGC
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/javadump.hprof" --conf
"spark.kryoserializer.buffer.max=512m" --class
com.dtise.data.streaming.ad.DTStreamingStatistics
hdfs://nameservice1/user/yanghb/spark-streaming-1.0.jar*
```

And This is the main codes:

```
val originalStream = ssc.textFileStream(rawDataPath)
    originalStream.repartition(10).mapPartitions(parseAdLog).reduceByKey(_
++ _)
      .mapWithState(StateSpec.function(countAdLogWithState
_)).foreachRDD(rdd => {
        if (!rdd.isEmpty()) {
          val batchTime = Calendar.getInstance.getTimeInMillis
          val dimensionSumMap = rdd.map(_._1).reduce(_ ++ _)
          val nameList = rdd.map(_._2).reduce(_ ++ _).toList
          val jedis = RedisUtils.jedis()
          jedis.hmset(joinString("t_ad_dimension_sum", batchTime),
dimensionSumMap)
          jedis.lpush(joinString("t_ad_name", batchTime), nameList: _*)
          jedis.set(joinString("t_ad", batchTime.toString), "OK")
          jedis.close()

          rdd.flatMap(_._3).foreachPartition(logInfoList => {
            val producter = new StringProducter
            for (logInfo <- logInfoList) {
              val logInfoArr = logInfo.split("\t", -1)
              val kafkaKey = "ad/" + logInfoArr(campaignIdIdx) + "/" +
logInfoArr(logDateIdx)
              producter.send("cookedLog", kafkaKey, logInfo)
            }
            producter.close()
          })
        }
      })
```

These are jvm heap mat results

<http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095238%402x.png>
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095254%402x.png>
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095331%402x.png>

/*Anybody has any advice about this ?
Thanks*/





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-exectors-memory-increasing-and-executor-killed-by-yarn-tp28500.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


[http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095238%402x.png]

[http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095254%402x.png]

[http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095331%402x.png]

Re: spark streaming exectors memory increasing and executor killed by yarn

Posted by Bill Schwanitz <bi...@bilsch.org>.
I have had similar issues with some of my spark jobs especially doing 
things like repartitioning.

spark.yarn.driver.memoryOverhead	driverMemory * 0.10, with minimum of 
384	The amount of off-heap memory (in megabytes) to be allocated per 
driver in cluster mode. This is memory that accounts for things like VM 
overheads, interned strings, other native overheads, etc. This tends to 
grow with the container size (typically 6-10%).

I bumped the overhead memory as a way to work around the issue. Not sure 
if that is the best way but its how I got around it ;)

darin wrote:
> Hi,
> I got this exception when streaming program run some hours.
>
> ```
> *User class threw exception: org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 21 in stage 1194.0 failed 4 times, most recent
> failure: Lost task 21.3 in stage 1194.0 (TID 2475, 2.dev3, executor 66):
> ExecutorLostFailure (executor 66 exited caused by one of the running tasks)
> Reason: Container killed by YARN for exceeding memory limits. 3.5 GB of 3.5
> GB physical memory used. Consider boosting
> spark.yarn.executor.memoryOverhead.*
> ```
>
> I have googled some solutions like close yarn memory monitor ,increasing
> exector memory... .I think it is not the right way .
>
>
> And this is the submit script:
> ```
> *spark-submit --master yarn-cluster --driver-cores 1 --driver-memory 1G
> --num-executors 6 --executor-cores 3 --executor-memory 3G --conf
> "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC -XX:+UseParNewGC
> -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/javadump.hprof" --conf
> "spark.kryoserializer.buffer.max=512m" --class
> com.dtise.data.streaming.ad.DTStreamingStatistics
> hdfs://nameservice1/user/yanghb/spark-streaming-1.0.jar*
> ```
>
> And This is the main codes:
>
> ```
> val originalStream = ssc.textFileStream(rawDataPath)
>      originalStream.repartition(10).mapPartitions(parseAdLog).reduceByKey(_
> ++ _)
>        .mapWithState(StateSpec.function(countAdLogWithState
> _)).foreachRDD(rdd =>  {
>          if (!rdd.isEmpty()) {
>            val batchTime = Calendar.getInstance.getTimeInMillis
>            val dimensionSumMap = rdd.map(_._1).reduce(_ ++ _)
>            val nameList = rdd.map(_._2).reduce(_ ++ _).toList
>            val jedis = RedisUtils.jedis()
>            jedis.hmset(joinString("t_ad_dimension_sum", batchTime),
> dimensionSumMap)
>            jedis.lpush(joinString("t_ad_name", batchTime), nameList: _*)
>            jedis.set(joinString("t_ad", batchTime.toString), "OK")
>            jedis.close()
>
>            rdd.flatMap(_._3).foreachPartition(logInfoList =>  {
>              val producter = new StringProducter
>              for (logInfo<- logInfoList) {
>                val logInfoArr = logInfo.split("\t", -1)
>                val kafkaKey = "ad/" + logInfoArr(campaignIdIdx) + "/" +
> logInfoArr(logDateIdx)
>                producter.send("cookedLog", kafkaKey, logInfo)
>              }
>              producter.close()
>            })
>          }
>        })
> ```
>
> These are jvm heap mat results
>
> <http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095238%402x.png>
> <http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095254%402x.png>
> <http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095331%402x.png>
>
> /*Anybody has any advice about this ?
> Thanks*/
>
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-exectors-memory-increasing-and-executor-killed-by-yarn-tp28500.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: spark streaming exectors memory increasing and executor killed by yarn

Posted by darin <li...@foxmail.com>.
I add this code in foreachRDD block .
```
rdd.persist(StorageLevel.MEMORY_AND_DISK)
```


This exception no occur agein.But many executor dead showing in spark
streaming UI .
```
User class threw exception: org.apache.spark.SparkException: Job aborted due
to stage failure: Task 21 in stage 1194.0 failed 4 times, most recent
failure: Lost task 21.3 in stage 1194.0 (TID 2475, 2.dev3, executor 66):
ExecutorLostFailure (executor 66 exited caused by one of the running tasks)
Reason: Container killed by YARN for exceeding memory limits. 3.5 GB of 3.5
GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead.
```




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-exectors-memory-increasing-and-executor-killed-by-yarn-tp28500p28506.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: spark streaming exectors memory increasing and executor killed by yarn

Posted by darin <li...@foxmail.com>.
This issue on stackoverflow maybe help

https://stackoverflow.com/questions/42641573/why-does-memory-usage-of-spark-worker-increases-with-time/42642233#42642233



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-exectors-memory-increasing-and-executor-killed-by-yarn-tp28500p28512.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org