You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by "Ulanov, Alexander" <al...@hpe.com> on 2015/10/10 01:35:37 UTC

Operations with cached RDD

Dear Spark developers,

I am trying to understand how Spark UI displays operation with the cached RDD.

For example, the following code caches an rdd:
>> val rdd = sc.parallelize(1 to 5, 5).zipWithIndex.cache
>> rdd.count
The Jobs tab shows me that the RDD is evaluated:
: 1 count at <console>:24              2015/10/09 16:15:43        0.4 s       1/1
: 0 zipWithIndex at <console> :21             2015/10/09 16:15:38        0.6 s       1/1
An I can observe this rdd in the Storage tab of Spark UI:
: ZippedWithIndexRDD  Memory Deserialized 1x Replicated

Then I want to make an operation over the cached RDD. I run the following code:
>> val g = rdd.groupByKey()
>> g.count
The Jobs tab shows me a new Job:
: 2 count at <console>:26
Inside this Job there are two stages:
: 3 count at <console>:26 +details 2015/10/09 16:16:18   0.2 s       5/5
: 2 zipWithIndex at <console>:21
It shows that zipWithIndex is executed again. It does not seem to be reasonable, because the rdd is cached, and zipWithIndex is already executed previously.

Could you explain why if I perform an operation followed by an action on a cached RDD, then the last operation in the lineage of the cached RDD is shown to be executed in the Spark UI?


Best regards, Alexander

RE: Operations with cached RDD

Posted by "Ulanov, Alexander" <al...@hpe.com>.
Thank you, Nitin. This does explain the problem. It seems that UI should make this more clear to the user, otherwise it is simply misleading if you read it as it.

From: Nitin Goyal [mailto:nitin2goyal@gmail.com]
Sent: Sunday, October 11, 2015 5:57 AM
To: Ulanov, Alexander
Cc: dev@spark.apache.org
Subject: Re: Operations with cached RDD

The problem is not that zipWithIndex is executed again. "groupBy" triggered hash partitioning on your keys and a shuffle happened due to that and that's why you are seeing 2 stages. You can confirm this by clicking on latter "zipWithIndex" stage and input data has "(memory)" written which means input data has been fetched from memory (your cached RDD).

As far as lineage/call site is concerned, I think there was a change in spark 1.3 which excluded some classes from appearing in call site (I know that some Spark SQL related were removed for sure).

Thanks
-Nitin


On Sat, Oct 10, 2015 at 5:05 AM, Ulanov, Alexander <al...@hpe.com>> wrote:
Dear Spark developers,

I am trying to understand how Spark UI displays operation with the cached RDD.

For example, the following code caches an rdd:
>> val rdd = sc.parallelize(1 to 5, 5).zipWithIndex.cache
>> rdd.count
The Jobs tab shows me that the RDD is evaluated:
: 1 count at <console>:24              2015/10/09 16:15:43        0.4 s       1/1
: 0 zipWithIndex at <console> :21             2015/10/09 16:15:38        0.6 s       1/1
An I can observe this rdd in the Storage tab of Spark UI:
: ZippedWithIndexRDD  Memory Deserialized 1x Replicated

Then I want to make an operation over the cached RDD. I run the following code:
>> val g = rdd.groupByKey()
>> g.count
The Jobs tab shows me a new Job:
: 2 count at <console>:26
Inside this Job there are two stages:
: 3 count at <console>:26 +details 2015/10/09 16:16:18   0.2 s       5/5
: 2 zipWithIndex at <console>:21
It shows that zipWithIndex is executed again. It does not seem to be reasonable, because the rdd is cached, and zipWithIndex is already executed previously.

Could you explain why if I perform an operation followed by an action on a cached RDD, then the last operation in the lineage of the cached RDD is shown to be executed in the Spark UI?


Best regards, Alexander



--
Regards
Nitin Goyal

Re: Operations with cached RDD

Posted by Nitin Goyal <ni...@gmail.com>.
The problem is not that zipWithIndex is executed again. "groupBy" triggered
hash partitioning on your keys and a shuffle happened due to that and
that's why you are seeing 2 stages. You can confirm this by clicking on
latter "zipWithIndex" stage and input data has "(memory)" written which
means input data has been fetched from memory (your cached RDD).

As far as lineage/call site is concerned, I think there was a change in
spark 1.3 which excluded some classes from appearing in call site (I know
that some Spark SQL related were removed for sure).

Thanks
-Nitin


On Sat, Oct 10, 2015 at 5:05 AM, Ulanov, Alexander <alexander.ulanov@hpe.com
> wrote:

> Dear Spark developers,
>
>
>
> I am trying to understand how Spark UI displays operation with the cached
> RDD.
>
>
>
> For example, the following code caches an rdd:
>
> >> val rdd = sc.parallelize(1 to 5, 5).zipWithIndex.cache
>
> >> rdd.count
>
> The Jobs tab shows me that the RDD is evaluated:
>
> : 1 count at <console>:24              2015/10/09 16:15:43        0.4
> s       1/1
>
> : 0 zipWithIndex at <console> :21             2015/10/09 16:15:38
> 0.6 s       1/1
>
> An I can observe this rdd in the Storage tab of Spark UI:
>
> : ZippedWithIndexRDD  Memory Deserialized 1x Replicated
>
>
>
> Then I want to make an operation over the cached RDD. I run the following
> code:
>
> >> val g = rdd.groupByKey()
>
> >> g.count
>
> The Jobs tab shows me a new Job:
>
> : 2 count at <console>:26
>
> Inside this Job there are two stages:
>
> : 3 count at <console>:26 +details 2015/10/09 16:16:18   0.2 s
> 5/5
>
> : 2 zipWithIndex at <console>:21
>
> It shows that zipWithIndex is executed again. It does not seem to be
> reasonable, because the rdd is cached, and zipWithIndex is already executed
> previously.
>
>
>
> Could you explain why if I perform an operation followed by an action on a
> cached RDD, then the last operation in the lineage of the cached RDD is
> shown to be executed in the Spark UI?
>
>
>
>
>
> Best regards, Alexander
>



-- 
Regards
Nitin Goyal