You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Jia Zhan <zh...@gmail.com> on 2015/10/16 23:02:31 UTC

In-memory computing and cache() in Spark

Hi all,

I am running Spark locally in one node and trying to sweep the memory size
for performance tuning. The machine has 8 CPUs and 16G main memory, the
dataset in my local disk is about 10GB. I have several quick questions and
appreciate any comments.

1. Spark performs in-memory computing, but without using RDD.cache(), will
anything be cached in memory at all? My guess is that, without RDD.cache(),
only a small amount of data will be stored in OS buffer cache, and every
iteration of computation will still need to fetch most data from disk every
time, is that right?

2. To evaluate how caching helps with iterative computation, I wrote a
simple program as shown below, which basically consists of one saveAsText()
and three reduce() actions/stages. I specify "spark.driver.memory" to
"15g", others by default. Then I run three experiments.

*       val* *conf* = *new* *SparkConf*().setAppName(*"wordCount"*)

       *val* *sc* = *new* *SparkContext*(conf)

       *val* *input* = sc.textFile(*"/InputFiles"*)

      *val* *words* = input.flatMap(line *=>* line.split(*" "*)).map(word
*=>* (word, *1*)).reduceByKey(_+_).saveAsTextFile(*"/OutputFiles"*)

      *val* *ITERATIONS* = *3*

      *for* (i *<-* *1* to *ITERATIONS*) {

          *val* *totallength* = input.filter(line*=>*line.contains(*"the"*
)).map(s*=>*s.length).reduce((a,b)*=>*a+b)

      }

(I) The first run: no caching at all. The application finishes in ~12
minutes (2.6min+3.3min+3.2min+3.3min)

(II) The second run, I modified the code so that the input will be cached:
                 *val input = sc.textFile("/InputFiles").cache()*
     The application finishes in ~11 mins!! (5.4min+1.9min+1.9min+2.0min)!
     The storage page in Web UI shows 48% of the dataset  is cached, which
makes sense due to large java object overhead, and
spark.storage.memoryFraction is 0.6 by default.

(III) However, the third run, same program as the second one, but I changed
"spark.driver.memory" to be "2g".
   The application finishes in just 3.6 minutes (3.0min + 9s + 9s + 9s)!!
And UI shows 6% of the data is cached.

*From the results we can see the reduce stages finish in seconds, how could
that happen with only 6% cached? Can anyone explain?*

I am new to Spark and would appreciate any help on this. Thanks!

Jia

Re: In-memory computing and cache() in Spark

Posted by Jia Zhan <zh...@gmail.com>.
Hi Igor,

It iterative conducts reduce((a,b)*=>*a+b) which is the action there. I can
see clearly 4 stages (one saveAsTextFile() and three Reduce()) in the web
UI. Don't know what's going there that causes the non-intuitive caching
behavior.

Thanks for help!

On Sun, Oct 18, 2015 at 11:32 PM, Igor Berman <ig...@gmail.com> wrote:

> Does ur iterations really submit job? I dont see any action there....
> On Oct 17, 2015 00:03, "Jia Zhan" <zh...@gmail.com> wrote:
>
>> Hi all,
>>
>> I am running Spark locally in one node and trying to sweep the memory
>> size for performance tuning. The machine has 8 CPUs and 16G main memory,
>> the dataset in my local disk is about 10GB. I have several quick questions
>> and appreciate any comments.
>>
>> 1. Spark performs in-memory computing, but without using RDD.cache(),
>> will anything be cached in memory at all? My guess is that, without
>> RDD.cache(), only a small amount of data will be stored in OS buffer cache,
>> and every iteration of computation will still need to fetch most data from
>> disk every time, is that right?
>>
>> 2. To evaluate how caching helps with iterative computation, I wrote a
>> simple program as shown below, which basically consists of one saveAsText()
>> and three reduce() actions/stages. I specify "spark.driver.memory" to
>> "15g", others by default. Then I run three experiments.
>>
>> *       val* *conf* = *new* *SparkConf*().setAppName(*"wordCount"*)
>>
>>        *val* *sc* = *new* *SparkContext*(conf)
>>
>>        *val* *input* = sc.textFile(*"/InputFiles"*)
>>
>>       *val* *words* = input.flatMap(line *=>* line.split(*" "*)).map(word
>> *=>* (word, *1*)).reduceByKey(_+_).saveAsTextFile(*"/OutputFiles"*)
>>
>>       *val* *ITERATIONS* = *3*
>>
>>       *for* (i *<-* *1* to *ITERATIONS*) {
>>
>>           *val* *totallength* = input.filter(line*=>*line.contains(
>> *"the"*)).map(s*=>*s.length).reduce((a,b)*=>*a+b)
>>
>>       }
>>
>> (I) The first run: no caching at all. The application finishes in ~12
>> minutes (2.6min+3.3min+3.2min+3.3min)
>>
>> (II) The second run, I modified the code so that the input will be
>> cached:
>>                  *val input = sc.textFile("/InputFiles").cache()*
>>      The application finishes in ~11 mins!! (5.4min+1.9min+1.9min+2.0min)!
>>      The storage page in Web UI shows 48% of the dataset  is cached,
>> which makes sense due to large java object overhead, and
>> spark.storage.memoryFraction is 0.6 by default.
>>
>> (III) However, the third run, same program as the second one, but I
>> changed "spark.driver.memory" to be "2g".
>>    The application finishes in just 3.6 minutes (3.0min + 9s + 9s + 9s)!!
>> And UI shows 6% of the data is cached.
>>
>> *From the results we can see the reduce stages finish in seconds, how
>> could that happen with only 6% cached? Can anyone explain?*
>>
>> I am new to Spark and would appreciate any help on this. Thanks!
>>
>> Jia
>>
>>
>>
>>


-- 
Jia Zhan

Re: In-memory computing and cache() in Spark

Posted by Igor Berman <ig...@gmail.com>.
Does ur iterations really submit job? I dont see any action there....
On Oct 17, 2015 00:03, "Jia Zhan" <zh...@gmail.com> wrote:

> Hi all,
>
> I am running Spark locally in one node and trying to sweep the memory size
> for performance tuning. The machine has 8 CPUs and 16G main memory, the
> dataset in my local disk is about 10GB. I have several quick questions and
> appreciate any comments.
>
> 1. Spark performs in-memory computing, but without using RDD.cache(), will
> anything be cached in memory at all? My guess is that, without RDD.cache(),
> only a small amount of data will be stored in OS buffer cache, and every
> iteration of computation will still need to fetch most data from disk every
> time, is that right?
>
> 2. To evaluate how caching helps with iterative computation, I wrote a
> simple program as shown below, which basically consists of one saveAsText()
> and three reduce() actions/stages. I specify "spark.driver.memory" to
> "15g", others by default. Then I run three experiments.
>
> *       val* *conf* = *new* *SparkConf*().setAppName(*"wordCount"*)
>
>        *val* *sc* = *new* *SparkContext*(conf)
>
>        *val* *input* = sc.textFile(*"/InputFiles"*)
>
>       *val* *words* = input.flatMap(line *=>* line.split(*" "*)).map(word
> *=>* (word, *1*)).reduceByKey(_+_).saveAsTextFile(*"/OutputFiles"*)
>
>       *val* *ITERATIONS* = *3*
>
>       *for* (i *<-* *1* to *ITERATIONS*) {
>
>           *val* *totallength* = input.filter(line*=>*line.contains(*"the"*
> )).map(s*=>*s.length).reduce((a,b)*=>*a+b)
>
>       }
>
> (I) The first run: no caching at all. The application finishes in ~12
> minutes (2.6min+3.3min+3.2min+3.3min)
>
> (II) The second run, I modified the code so that the input will be cached:
>                  *val input = sc.textFile("/InputFiles").cache()*
>      The application finishes in ~11 mins!! (5.4min+1.9min+1.9min+2.0min)!
>      The storage page in Web UI shows 48% of the dataset  is cached, which
> makes sense due to large java object overhead, and
> spark.storage.memoryFraction is 0.6 by default.
>
> (III) However, the third run, same program as the second one, but I
> changed "spark.driver.memory" to be "2g".
>    The application finishes in just 3.6 minutes (3.0min + 9s + 9s + 9s)!!
> And UI shows 6% of the data is cached.
>
> *From the results we can see the reduce stages finish in seconds, how
> could that happen with only 6% cached? Can anyone explain?*
>
> I am new to Spark and would appreciate any help on this. Thanks!
>
> Jia
>
>
>
>

Re: In-memory computing and cache() in Spark

Posted by Jia Zhan <zh...@gmail.com>.
Hi Sonal,

I tried changing the size spark.executor.memory but noting changes. It
seems when I run locally in one machine, the RDD is cached in driver memory
instead of executor memory. Here is a related post online:
http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-in-Local-Mode-td22279.html

When I change spark.driver.memory, I can see the change of cached data in
 web UI. Like I mentioned, when I set driver memory to 2G, it says 6% RDD
cached. When set to 15G, it says 48% RDD cached, but with much slower
speed!

On Sun, Oct 18, 2015 at 10:32 PM, Sonal Goyal <so...@gmail.com> wrote:

> Hi Jia,
>
> RDDs are cached on the executor, not on the driver. I am assuming you are
> running locally and haven't changed spark.executor.memory?
>
> Sonal
> On Oct 19, 2015 1:58 AM, "Jia Zhan" <zh...@gmail.com> wrote:
>
> Anyone has any clue what's going on.? Why would caching with 2g memory
> much faster than with 15g memory?
>
> Thanks very much!
>
> On Fri, Oct 16, 2015 at 2:02 PM, Jia Zhan <zh...@gmail.com> wrote:
>
>> Hi all,
>>
>> I am running Spark locally in one node and trying to sweep the memory
>> size for performance tuning. The machine has 8 CPUs and 16G main memory,
>> the dataset in my local disk is about 10GB. I have several quick questions
>> and appreciate any comments.
>>
>> 1. Spark performs in-memory computing, but without using RDD.cache(),
>> will anything be cached in memory at all? My guess is that, without
>> RDD.cache(), only a small amount of data will be stored in OS buffer cache,
>> and every iteration of computation will still need to fetch most data from
>> disk every time, is that right?
>>
>> 2. To evaluate how caching helps with iterative computation, I wrote a
>> simple program as shown below, which basically consists of one saveAsText()
>> and three reduce() actions/stages. I specify "spark.driver.memory" to
>> "15g", others by default. Then I run three experiments.
>>
>> *       val* *conf* = *new* *SparkConf*().setAppName(*"wordCount"*)
>>
>>        *val* *sc* = *new* *SparkContext*(conf)
>>
>>        *val* *input* = sc.textFile(*"/InputFiles"*)
>>
>>       *val* *words* = input.flatMap(line *=>* line.split(*" "*)).map(word
>> *=>* (word, *1*)).reduceByKey(_+_).saveAsTextFile(*"/OutputFiles"*)
>>
>>       *val* *ITERATIONS* = *3*
>>
>>       *for* (i *<-* *1* to *ITERATIONS*) {
>>
>>           *val* *totallength* = input.filter(line*=>*line.contains(
>> *"the"*)).map(s*=>*s.length).reduce((a,b)*=>*a+b)
>>
>>       }
>>
>> (I) The first run: no caching at all. The application finishes in ~12
>> minutes (2.6min+3.3min+3.2min+3.3min)
>>
>> (II) The second run, I modified the code so that the input will be
>> cached:
>>                  *val input = sc.textFile("/InputFiles").cache()*
>>      The application finishes in ~11 mins!! (5.4min+1.9min+1.9min+2.0min)!
>>      The storage page in Web UI shows 48% of the dataset  is cached,
>> which makes sense due to large java object overhead, and
>> spark.storage.memoryFraction is 0.6 by default.
>>
>> (III) However, the third run, same program as the second one, but I
>> changed "spark.driver.memory" to be "2g".
>>    The application finishes in just 3.6 minutes (3.0min + 9s + 9s + 9s)!!
>> And UI shows 6% of the data is cached.
>>
>> *From the results we can see the reduce stages finish in seconds, how
>> could that happen with only 6% cached? Can anyone explain?*
>>
>> I am new to Spark and would appreciate any help on this. Thanks!
>>
>> Jia
>>
>>
>>
>>
>
>
> --
> Jia Zhan
>
>


-- 
Jia Zhan

Re: In-memory computing and cache() in Spark

Posted by Sonal Goyal <so...@gmail.com>.
Hi Jia,

RDDs are cached on the executor, not on the driver. I am assuming you are
running locally and haven't changed spark.executor.memory?

Sonal
On Oct 19, 2015 1:58 AM, "Jia Zhan" <zh...@gmail.com> wrote:

Anyone has any clue what's going on.? Why would caching with 2g memory much
faster than with 15g memory?

Thanks very much!

On Fri, Oct 16, 2015 at 2:02 PM, Jia Zhan <zh...@gmail.com> wrote:

> Hi all,
>
> I am running Spark locally in one node and trying to sweep the memory size
> for performance tuning. The machine has 8 CPUs and 16G main memory, the
> dataset in my local disk is about 10GB. I have several quick questions and
> appreciate any comments.
>
> 1. Spark performs in-memory computing, but without using RDD.cache(), will
> anything be cached in memory at all? My guess is that, without RDD.cache(),
> only a small amount of data will be stored in OS buffer cache, and every
> iteration of computation will still need to fetch most data from disk every
> time, is that right?
>
> 2. To evaluate how caching helps with iterative computation, I wrote a
> simple program as shown below, which basically consists of one saveAsText()
> and three reduce() actions/stages. I specify "spark.driver.memory" to
> "15g", others by default. Then I run three experiments.
>
> *       val* *conf* = *new* *SparkConf*().setAppName(*"wordCount"*)
>
>        *val* *sc* = *new* *SparkContext*(conf)
>
>        *val* *input* = sc.textFile(*"/InputFiles"*)
>
>       *val* *words* = input.flatMap(line *=>* line.split(*" "*)).map(word
> *=>* (word, *1*)).reduceByKey(_+_).saveAsTextFile(*"/OutputFiles"*)
>
>       *val* *ITERATIONS* = *3*
>
>       *for* (i *<-* *1* to *ITERATIONS*) {
>
>           *val* *totallength* = input.filter(line*=>*line.contains(*"the"*
> )).map(s*=>*s.length).reduce((a,b)*=>*a+b)
>
>       }
>
> (I) The first run: no caching at all. The application finishes in ~12
> minutes (2.6min+3.3min+3.2min+3.3min)
>
> (II) The second run, I modified the code so that the input will be cached:
>                  *val input = sc.textFile("/InputFiles").cache()*
>      The application finishes in ~11 mins!! (5.4min+1.9min+1.9min+2.0min)!
>      The storage page in Web UI shows 48% of the dataset  is cached, which
> makes sense due to large java object overhead, and
> spark.storage.memoryFraction is 0.6 by default.
>
> (III) However, the third run, same program as the second one, but I
> changed "spark.driver.memory" to be "2g".
>    The application finishes in just 3.6 minutes (3.0min + 9s + 9s + 9s)!!
> And UI shows 6% of the data is cached.
>
> *From the results we can see the reduce stages finish in seconds, how
> could that happen with only 6% cached? Can anyone explain?*
>
> I am new to Spark and would appreciate any help on this. Thanks!
>
> Jia
>
>
>
>


-- 
Jia Zhan

Re: In-memory computing and cache() in Spark

Posted by Jia Zhan <zh...@gmail.com>.
Anyone has any clue what's going on.? Why would caching with 2g memory much
faster than with 15g memory?

Thanks very much!

On Fri, Oct 16, 2015 at 2:02 PM, Jia Zhan <zh...@gmail.com> wrote:

> Hi all,
>
> I am running Spark locally in one node and trying to sweep the memory size
> for performance tuning. The machine has 8 CPUs and 16G main memory, the
> dataset in my local disk is about 10GB. I have several quick questions and
> appreciate any comments.
>
> 1. Spark performs in-memory computing, but without using RDD.cache(), will
> anything be cached in memory at all? My guess is that, without RDD.cache(),
> only a small amount of data will be stored in OS buffer cache, and every
> iteration of computation will still need to fetch most data from disk every
> time, is that right?
>
> 2. To evaluate how caching helps with iterative computation, I wrote a
> simple program as shown below, which basically consists of one saveAsText()
> and three reduce() actions/stages. I specify "spark.driver.memory" to
> "15g", others by default. Then I run three experiments.
>
> *       val* *conf* = *new* *SparkConf*().setAppName(*"wordCount"*)
>
>        *val* *sc* = *new* *SparkContext*(conf)
>
>        *val* *input* = sc.textFile(*"/InputFiles"*)
>
>       *val* *words* = input.flatMap(line *=>* line.split(*" "*)).map(word
> *=>* (word, *1*)).reduceByKey(_+_).saveAsTextFile(*"/OutputFiles"*)
>
>       *val* *ITERATIONS* = *3*
>
>       *for* (i *<-* *1* to *ITERATIONS*) {
>
>           *val* *totallength* = input.filter(line*=>*line.contains(*"the"*
> )).map(s*=>*s.length).reduce((a,b)*=>*a+b)
>
>       }
>
> (I) The first run: no caching at all. The application finishes in ~12
> minutes (2.6min+3.3min+3.2min+3.3min)
>
> (II) The second run, I modified the code so that the input will be cached:
>                  *val input = sc.textFile("/InputFiles").cache()*
>      The application finishes in ~11 mins!! (5.4min+1.9min+1.9min+2.0min)!
>      The storage page in Web UI shows 48% of the dataset  is cached, which
> makes sense due to large java object overhead, and
> spark.storage.memoryFraction is 0.6 by default.
>
> (III) However, the third run, same program as the second one, but I
> changed "spark.driver.memory" to be "2g".
>    The application finishes in just 3.6 minutes (3.0min + 9s + 9s + 9s)!!
> And UI shows 6% of the data is cached.
>
> *From the results we can see the reduce stages finish in seconds, how
> could that happen with only 6% cached? Can anyone explain?*
>
> I am new to Spark and would appreciate any help on this. Thanks!
>
> Jia
>
>
>
>


-- 
Jia Zhan