You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2016/01/21 17:26:40 UTC
[jira] [Resolved] (SPARK-6056) Unlimit offHeap memory use cause RM
killing the container
[ https://issues.apache.org/jira/browse/SPARK-6056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen resolved SPARK-6056.
------------------------------
Resolution: Not A Problem
> Unlimit offHeap memory use cause RM killing the container
> ---------------------------------------------------------
>
> Key: SPARK-6056
> URL: https://issues.apache.org/jira/browse/SPARK-6056
> Project: Spark
> Issue Type: Bug
> Components: Shuffle, Spark Core
> Affects Versions: 1.2.1
> Reporter: SaintBacchus
>
> No matter set the `preferDirectBufs` or limit the number of thread or not ,spark can not limit the use of offheap memory.
> At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, Netty had allocated a offheap memory buffer with the same size in heap.
> So how many buffer you want to transfor, the same size offheap memory will be allocated.
> But once the allocated memory size reach the capacity of the overhead momery set in yarn, this executor will be killed.
> I wrote a simple code to test it:
> {code:title=test.scala|borderStyle=solid}
> import org.apache.spark.storage._
> import org.apache.spark._
> val bufferRdd = sc.makeRDD(0 to 10, 10).map(x=>new Array[Byte](10*1024*1024)).persist
> bufferRdd.count
> val part = bufferRdd.partitions(0)
> val sparkEnv = SparkEnv.get
> val blockMgr = sparkEnv.blockManager
> def test = {
> val blockOption = blockMgr.get(RDDBlockId(bufferRdd.id, part.index))
> val resultIt = blockOption.get.data.asInstanceOf[Iterator[Array[Byte]]]
> val len = resultIt.map(_.length).sum
> println(s"[${Thread.currentThread.getId}] get block length = $len")
> }
> def test_driver(count:Int, parallel:Int)(f: => Unit) = {
> val tpool = new scala.concurrent.forkjoin.ForkJoinPool(parallel)
> val taskSupport = new scala.collection.parallel.ForkJoinTaskSupport(tpool)
> val parseq = (1 to count).par
> parseq.tasksupport = taskSupport
> parseq.foreach(x=>f)
> tpool.shutdown
> tpool.awaitTermination(100, java.util.concurrent.TimeUnit.SECONDS)
> }
> {code}
> progress:
> 1. bin/spark-shell --master yarn-cilent --executor-cores 40 --num-executors 1
> 2. :load test.scala in spark-shell
> 3. use such comman to catch executor on slave node
> {code}
> pid=$(jps|grep CoarseGrainedExecutorBackend |awk '{print $1}');top -b -p $pid|grep $pid
> {code}
> 4. test_driver(20,100)(test) in spark-shell
> 5. watch the output of the command on slave node
> If use multi-thread to get len, the physical memery will soon exceed the limit set by spark.yarn.executor.memoryOverhead
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org