You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Adam Roberts <AR...@uk.ibm.com> on 2016/05/04 17:01:37 UTC

Caching behaviour and deserialized size

Hi, 

Given a very simple test that uses a bigger version of the pom.xml file in 
our Spark home directory (cat with a bash for loop into itself so it 
becomes 100 MB), I've noticed with larger heap sizes it looks like we have 
more RDDs reported as being cached, is this intended behaviour? What 
exactly are we looking at, replicas perhaps (the resiliency in RDD) or 
partitions for the same RDD?

With a 512 MB heap (max and initial size), regardless of JDK vendor:

Looking for mybiggerpom.xml in the directory you're running this 
application from
Added broadcast_0_piece0 in memory on 10.0.2.15:35762 (size: 15.8 KB, 
free: 159.0 MB)
caching in memory
Added broadcast_1_piece0 in memory on 10.0.2.15:35762 (size: 1789.0 B, 
free: 159.0 MB)
Added rdd_1_0 in memory on 10.0.2.15:35762 (size: 110.7 MB, free: 48.3 MB)
lines.count(): 2790700

Yet if I increase it to 1024 MB (again max and initial size), I see this:

Looking for mybiggerpom.xml in the directory you're running this 
application from
Added broadcast_0_piece0 in memory on 10.0.2.15:39739 (size: 15.8 KB, 
free: 543.0 MB)
caching in memory
Added broadcast_1_piece0 in memory on 10.0.2.15:39739 (size: 1789.0 B, 
free: 543.0 MB)
Added rdd_1_0 in memory on 10.0.2.15:39739 (size: 110.7 MB, free: 432.3 
MB)
Added rdd_1_1 in memory on 10.0.2.15:39739 (size: 107.3 MB, free: 325.0 
MB)
Added rdd_1_2 in memory on 10.0.2.15:39739 (size: 107.0 MB, free: 218.1 
MB)
lines.count(): 2790700

My simple test case:
//scalastyle:off

import java.io.File
import org.apache.spark._
import org.apache.spark.rdd._

object Trimmed {

  def main(args: Array[String]) {
    val sc = new SparkContext(new SparkConf().setAppName("Adam RDD cached 
size experiment")
      .setMaster("local[1]"))

    var fileName = "mybiggerpom.xml"
    if (args != null && args.length > 0) {
     fileName = args(0)
    }
    println("Looking for " + fileName + " in the directory you're running 
this application from")
    val lines = sc.textFile(fileName)
    println("caching in memory")
    lines.cache()
    println("lines.count(): " + lines.count())
  }
}

I also want to figure out where the cached RDD size value comes from and I 
noticed deserializedSize is used (in BlockManagerMasterEndpoint.scala), 
where does this value come from? I understand SizeEstimator plays a big 
role but it's unclear who's responsible for figuring out deserializedSize 
in the first place despite my best efforts with Intellij and a lot of 
grepping.

I'm using recent Spark 2.0 code, any guidance here will be appreciated, 
cheers




Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 
741598. 
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU