You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by wxhsdp <wx...@gmail.com> on 2014/04/15 04:07:24 UTC

storage.MemoryStore estimated size 7 times larger than real

Hi, all
in order to understand the memory usage about spark, i do the following test

val size = 1024*1024
val array = new Array[Int](size)

for(i <- 0 until size) {
array(i) = i
}

val a = sc.parallelize(array).cache() /*4MB*/

val b = a.mapPartitions{ c => {
  val d = c.toArray

  val e = new Array[Int](2*size) /*8MB*/
  val f = new Array[Int](2*size) /*8MB*/

  for(i <- 0 until 2*size) {
    e(i) = d(i % size)
    f(i) = d((i+1) % size)
  }

  (e++f).toIterator
}}.cache()

when i compile and run in sbt, the estimated size of a and b is exactly 7
times larger than the real size

14/04/15 09:10:55 INFO storage.MemoryStore: Block rdd_0_0 stored as values
to memory (estimated size 28.0 MB, free 862.9 MB)
14/04/15 09:10:55 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
Added rdd_0_0 in memory on ubuntu.local:59962 (size: 28.0 MB, free: 862.9
MB)

14/04/15 09:10:56 INFO storage.MemoryStore: Block rdd_1_0 stored as values
to memory (estimated size 112.0 MB, free 750.9 MB)
14/04/15 09:10:56 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
Added rdd_1_0 in memory on ubuntu.local:59962 (size: 112.0 MB, free: 750.9
MB)

but when i try it in the spark shell, the estimated size is almost equal to
real size

14/04/15 09:23:27 INFO MemoryStore: Block rdd_0_0 stored as values to memory
(estimated size 4.2 MB, free 292.7 MB)
14/04/15 09:23:27 INFO BlockManagerMasterActor$BlockManagerInfo: Added
rdd_0_0 in memory on ubuntu.local:54071 (size: 4.2 MB, free: 292.7 MB)

14/04/15 09:27:40 INFO MemoryStore: Block rdd_1_0 stored as values to memory
(estimated size 17.0 MB, free 275.8 MB)
14/04/15 09:27:40 INFO BlockManagerMasterActor$BlockManagerInfo: Added
rdd_1_0 in memory on ubuntu.local:54071 (size: 17.0 MB, free: 275.8 MB)

who knows the reason?
i'm really confused about memory use in spark. 
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n4251/memory.png> 

JVM and spark memory locate at different parts of system memory, the spark
code is executed in JVM memory, malloc operation like val e = new
Array[Int](2*size) /*8MB*/ use JVM memory. if not cached, generated RDDs are
writed back to disk, if cached, RDDs are copied to spark memory, is that
right?



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/storage-MemoryStore-estimated-size-7-times-larger-than-real-tp4251.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: storage.MemoryStore estimated size 7 times larger than real

Posted by wxhsdp <wx...@gmail.com>.
thank you so much, davidson
ye, you are right, in both sbt and spark shell, the result of my code is
28MB, it's irrelevant to numSlices.
yesterday i had the result of 4.2MB in spark shell, because i remove array
initialization for laziness:)

for(i <- 0 until size) {
      array(i) = i
    }



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/storage-MemoryStore-estimated-size-7-times-larger-than-real-tp4251p4306.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: storage.MemoryStore estimated size 7 times larger than real

Posted by Aaron Davidson <il...@gmail.com>.
Ah, I think I can see where your issue may be coming from. In spark-shell,
the MASTER is "local[*]", which just means it uses a pre-set number of
cores. This distinction only matters because the default number of slices
created from sc.parallelize() is based on the number of cores.

So when you run from sbt, you probably use a SparkContext with a "local"
master, which sets number of cores to 1, meaning you are doing
sc.parallelize(array, 1)

while in Spark Shell you are doing
sc.parallelize(array, 6ish?)

The difference between the two is just that the array is broken up into
more parts in the latter, so you will store blocks for rdd_0_0, rdd_0_1,
..., rdd_0_5 rather than just one (large) block. In both cases, though, I
suspect that the total size is around the same, at around 28 MB.

In my case, where I have an RDD[Array[Int]], I have 8 partitions (a number
I just chose randomly), and each one is 512 KB, so the total size is
actually 4 MB. You could do the same test with numSlices = 1, and you'd
just have a single 4 MB block.

The reason our two solutions produced different total memory values is
because of Java primitive boxing [1]. In your case, your RDD[Int] is
converted into an Array[Any] right before being stored into memory, which
causes it to be effectively an Array[java.lang.Integer] [2]. In my case,
the actual values inside the RDD are primitive arrays, so they cannot be
broken up. Spark still converts my RDD[Array[Int]] into an Array[Any], but
"Array[Int]" is already an Any, so there's no memory impact here.

[1] http://docs.oracle.com/javase/tutorial/java/data/autoboxing.html
[2]
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala#L90



On Tue, Apr 15, 2014 at 3:58 AM, wxhsdp <wx...@gmail.com> wrote:

> sorry, davidosn, i don't catch the point. what's the essential difference
> between our codes?
> /*my code*/
> val array = new Array[Int](size)
> val a = sc.parallelize(array).cache() /*4MB*/
>
> /*your code*/
> val numSlices = 8
> val arr = Array.fill[Array[Int]](numSlices) { new Array[Int](size /
> numSlices) }
> val rdd = sc.parallelize(arr, numSlices).cache()
>
> i'm in local mode, with only one partitions, it's just an RDD of one
> partition with the type RDD[Int]
> your RDD have 8 partitions with the type RDD[Array[Int]], do that matter?
> my question is why the memory usage is 7x in sbt, but right in spark shell?
>
> as to the following question, i made a mistake, sorry
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/storage-MemoryStore-estimated-size-7-times-larger-than-real-tp4251p4269.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Re: storage.MemoryStore estimated size 7 times larger than real

Posted by wxhsdp <wx...@gmail.com>.
sorry, davidosn, i don't catch the point. what's the essential difference
between our codes?
/*my code*/
val array = new Array[Int](size)
val a = sc.parallelize(array).cache() /*4MB*/

/*your code*/
val numSlices = 8
val arr = Array.fill[Array[Int]](numSlices) { new Array[Int](size /
numSlices) }
val rdd = sc.parallelize(arr, numSlices).cache()

i'm in local mode, with only one partitions, it's just an RDD of one
partition with the type RDD[Int]
your RDD have 8 partitions with the type RDD[Array[Int]], do that matter?
my question is why the memory usage is 7x in sbt, but right in spark shell?

as to the following question, i made a mistake, sorry



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/storage-MemoryStore-estimated-size-7-times-larger-than-real-tp4251p4269.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: storage.MemoryStore estimated size 7 times larger than real

Posted by Aaron Davidson <il...@gmail.com>.
Hey, I was talking about something more like:

    val size = 1024 * 1024
    val numSlices = 8
    val arr = Array.fill[Array[Int]](numSlices) { new Array[Int](size /
numSlices) }
    val rdd = sc.parallelize(arr, numSlices).cache()
    val size2 = rdd.map(_.length).sum()
    assert( size2 == size )

If I do this, I see 8 blocks are put into MemoryStore, each with a size of
512.1 KB, which adds up to almost exactly 4MB as expected.

Regarding your other questions:
Non-cached RDDs are not written back to disk, their results are simply not
stored anywhere. If the results are needed again, the RDD will be
recomputed. I'm not sure I understand your distinction between "JVM" and
"Spark" memory -- both arrays and cached RDDs are stored in the JVM heap.

Shuffle operations are unique in that they store intermediate output to
local disk immediately, in order to avoid overly expensive recomputation.
This shuffle data is always written to disk, whether or not the input
RDD(s) are cached, and the final output of the shuffle (the groupBy in your
example) will *not* be cached in memory unless explicitly requested.



On Mon, Apr 14, 2014 at 8:48 PM, wxhsdp <wx...@gmail.com> wrote:

> thanks for your help,  Davidson!
> i modified
> val a:RDD[Int] = sc.parallelize(array).cache()
> to keep "val a" an RDD of Int, but has the same result
>
> another question
> JVM and spark memory locate at different parts of system memory, the spark
> code is executed in JVM memory, malloc operation like val e = new
> Array[Int](2*size) /*8MB*/ use JVM memory. if not cached, generated RDDs
> are
> writed back to disk, if cached, RDDs are copied to spark memory for further
> use, is that
> right?
>
> val RDD_1 = RDD_0.groupByKey{...}
> shuffle separate stages, can anyone tell me the memory/disk usage of
> shuffle
> input  RDD and shuffle output RDD under the condition that RDD_0, RDD_1 is
> cached or not?
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/storage-MemoryStore-estimated-size-7-times-larger-than-real-tp4251p4256.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Re: storage.MemoryStore estimated size 7 times larger than real

Posted by wxhsdp <wx...@gmail.com>.
thanks for your help,  Davidson!
i modified
val a:RDD[Int] = sc.parallelize(array).cache()
to keep "val a" an RDD of Int, but has the same result

another question
JVM and spark memory locate at different parts of system memory, the spark
code is executed in JVM memory, malloc operation like val e = new
Array[Int](2*size) /*8MB*/ use JVM memory. if not cached, generated RDDs are
writed back to disk, if cached, RDDs are copied to spark memory for further
use, is that
right?

val RDD_1 = RDD_0.groupByKey{...}
shuffle separate stages, can anyone tell me the memory/disk usage of shuffle
input  RDD and shuffle output RDD under the condition that RDD_0, RDD_1 is
cached or not? 





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/storage-MemoryStore-estimated-size-7-times-larger-than-real-tp4251p4256.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: storage.MemoryStore estimated size 7 times larger than real

Posted by Aaron Davidson <il...@gmail.com>.
It's likely the Ints are getting boxed at some point along the journey
(perhaps starting with parallelize()). I could definitely see boxed Ints
being 7 times larger than primitive ones.

If you wanted to be very careful, you could try making an RDD[Array[Int]],
where each element is simply a subset of your original array, and
specifying one partition per element, effectively manually partitioning
your data. I suspect you'd see the 7x overhead disappear.


On Mon, Apr 14, 2014 at 7:07 PM, wxhsdp <wx...@gmail.com> wrote:

> Hi, all
> in order to understand the memory usage about spark, i do the following
> test
>
> val size = 1024*1024
> val array = new Array[Int](size)
>
> for(i <- 0 until size) {
> array(i) = i
> }
>
> val a = sc.parallelize(array).cache() /*4MB*/
>
> val b = a.mapPartitions{ c => {
>   val d = c.toArray
>
>   val e = new Array[Int](2*size) /*8MB*/
>   val f = new Array[Int](2*size) /*8MB*/
>
>   for(i <- 0 until 2*size) {
>     e(i) = d(i % size)
>     f(i) = d((i+1) % size)
>   }
>
>   (e++f).toIterator
> }}.cache()
>
> when i compile and run in sbt, the estimated size of a and b is exactly 7
> times larger than the real size
>
> 14/04/15 09:10:55 INFO storage.MemoryStore: Block rdd_0_0 stored as values
> to memory (estimated size 28.0 MB, free 862.9 MB)
> 14/04/15 09:10:55 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
> Added rdd_0_0 in memory on ubuntu.local:59962 (size: 28.0 MB, free: 862.9
> MB)
>
> 14/04/15 09:10:56 INFO storage.MemoryStore: Block rdd_1_0 stored as values
> to memory (estimated size 112.0 MB, free 750.9 MB)
> 14/04/15 09:10:56 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
> Added rdd_1_0 in memory on ubuntu.local:59962 (size: 112.0 MB, free: 750.9
> MB)
>
> but when i try it in the spark shell, the estimated size is almost equal to
> real size
>
> 14/04/15 09:23:27 INFO MemoryStore: Block rdd_0_0 stored as values to
> memory
> (estimated size 4.2 MB, free 292.7 MB)
> 14/04/15 09:23:27 INFO BlockManagerMasterActor$BlockManagerInfo: Added
> rdd_0_0 in memory on ubuntu.local:54071 (size: 4.2 MB, free: 292.7 MB)
>
> 14/04/15 09:27:40 INFO MemoryStore: Block rdd_1_0 stored as values to
> memory
> (estimated size 17.0 MB, free 275.8 MB)
> 14/04/15 09:27:40 INFO BlockManagerMasterActor$BlockManagerInfo: Added
> rdd_1_0 in memory on ubuntu.local:54071 (size: 17.0 MB, free: 275.8 MB)
>
> who knows the reason?
> i'm really confused about memory use in spark.
> <http://apache-spark-user-list.1001560.n3.nabble.com/file/n4251/memory.png
> >
>
> JVM and spark memory locate at different parts of system memory, the spark
> code is executed in JVM memory, malloc operation like val e = new
> Array[Int](2*size) /*8MB*/ use JVM memory. if not cached, generated RDDs
> are
> writed back to disk, if cached, RDDs are copied to spark memory, is that
> right?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/storage-MemoryStore-estimated-size-7-times-larger-than-real-tp4251.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>