You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Tom Hubregtsen (JIRA)" <ji...@apache.org> on 2015/04/20 23:46:59 UTC

[jira] [Comment Edited] (SPARK-7002) Persist on RDD fails the second time if the action is called on a child RDD without showing a FAILED message

    [ https://issues.apache.org/jira/browse/SPARK-7002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14503730#comment-14503730 ] 

Tom Hubregtsen edited comment on SPARK-7002 at 4/20/15 9:46 PM:
----------------------------------------------------------------

Your speculation was correct:

After the above computation, I performed the next extra steps:

I first tried to remove the data from rdd3, unpersisting it
{code}
scala> rdd3.unpersist() 
scala> rdd3.collect()
{code}
--> This did not work, rdd2 was still not on the disk

I then looked in the file system and found shuffle data. I removed these manually (shuffle_0_0_0.data and shuffle_0_0_0.index), after which I invoked the action on the child
{code}
scala> rdd3.collect()
{code}
--> This worked, rdd2 appeared on disk

Next to this, I also looked if a different action that could not rely on these shuffle files would invoke computation on rdd2 (as per your suggestion; FYI, I performed these two experiments separately from each other so that they don't influence each other):
{code}
scala> val rdd4 = rdd2.reduceByKey( (x,y) => x*y)
scala> rdd4.collect()
{code}
--> This worked too, rdd2 appeared on disk again

Conclusion: Rdd2 was actually not recomputed, as rdd3 was using the shuffle data that was stored on disk. 

Action: Should we still do something about the message in .toDebugString? It currently mentions when data is persisted on either disk or memory, but does not mention that it is saving the shuffle data. I do believe this is something you want to know. I at least called this method with the intention to know where in my DAG data is actually present, and got to believe data was not present, while in fact it was.


was (Author: thubregtsen):
Your speculation was correct:

After the above computation, I performed the next extra steps:

I first tried to remove the data from rdd3, unpersisting it
scala> rdd3.unpersist() 
scala> rdd3.collect()
--> This did not work, rdd2 was still not on the disk

I then looked in the file system and found shuffle data. I removed these manually (shuffle_0_0_0.data and shuffle_0_0_0.index), after which I invoked the action on the child
scala> rdd3.collect()
--> This worked, rdd2 appeared on disk

Next to this, I also looked if a different action that could not rely on these shuffle files would invoke computation on rdd2 (as per your suggestion; FYI, I performed these two experiments separately from each other so that they don't influence each other):
scala> val rdd4 = rdd2.reduceByKey( (x,y) => x*y)
scala> rdd4.collect()
--> This worked too, rdd2 appeared on disk again

Conclusion: Rdd2 was actually not recomputed, as rdd3 was using the shuffle data that was stored on disk. 

Action: Should we still do something about the message in .toDebugString? It currently mentions when data is persisted on either disk or memory, but does not mention that it is saving the shuffle data. I do believe this is something you want to know. I at least called this method with the intention to know where in my DAG data is actually present, and got to believe data was not present, while in fact it was.

> Persist on RDD fails the second time if the action is called on a child RDD without showing a FAILED message
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-7002
>                 URL: https://issues.apache.org/jira/browse/SPARK-7002
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.3.0
>         Environment: Platform: Power8
> OS: Ubuntu 14.10
> Java: java-8-openjdk-ppc64el
>            Reporter: Tom Hubregtsen
>            Priority: Minor
>              Labels: disk, persist, unpersist
>
> The major issue is: Persist on RDD fails the second time if the action is called on a child RDD without showing a FAILED message. This is pointed out at 2)
> next to this:
> toDebugString on a child RDD does not show that the parent RDD is [Disk Serialized 1x Replicated]. This is pointed out at 1)
> Note: I am persisting to disk (DISK_ONLY) to validate that the RDD is or is not physically stored, as I did not want to solely rely on a missing line in .toDebugString (see comments in trace)
> {code}
> scala> val rdd1 = sc.parallelize(List(1,2,3))
> scala> val rdd2 = rdd1.map(x => (x,x+1))
> scala> val rdd3 = rdd2.reduceByKey( (x,y) => x+y)
> scala> import org.apache.spark.storage.StorageLevel
> scala> rdd2.persist(StorageLevel.DISK_ONLY)
> scala> rdd3.collect()
> scala> rdd2.toDebugString
> res4: String = 
> (100) MapPartitionsRDD[1] at map at <console>:23 [Disk Serialized 1x Replicated]
>   \|        CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; DiskSize: 802.0 B
>   \|   ParallelCollectionRDD[0] at parallelize at <console>:21 [Disk Serialized 1x Replicated]
> scala> rdd3.toDebugString
> res5: String = 
> (100) ShuffledRDD[2] at reduceByKey at <console>:25 []
>   +-(100) MapPartitionsRDD[1] at map at <console>:23 []
>       \|       CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; DiskSize: 802.0 B
>       \|   ParallelCollectionRDD[0] at parallelize at <console>:21 []
> // 1) rdd3 does not show that the other RDD's are [Disk Serialized 1x Replicated], but the data is on disk. This is verified by
> // a) The line starting with CachedPartitions
> // b) a find in spark_local_dir: "find . -name "\*"  \| grep rdd" returns "./spark-b39bcf9b-e7d7-4284-bdd2-1be7ac3cacef/blockmgr-4f4c0b1c-b47a-4972-b364-7179ea6e0873/1f/rdd_4_*", where * are the number of partitions
> scala> rdd2.unpersist()
> scala> rdd2.toDebugString
> res8: String = 
> (100) MapPartitionsRDD[1] at map at <console>:23 []
>   \|   ParallelCollectionRDD[0] at parallelize at <console>:21 []
> scala> rdd3.toDebugString
> res9: String = 
> (100) ShuffledRDD[2] at reduceByKey at <console>:25 []
>   +-(100) MapPartitionsRDD[1] at map at <console>:23 []
>       \|   ParallelCollectionRDD[0] at parallelize at <console>:21 []
> // successfully unpersisted, also not visible on disk
> scala> rdd2.persist(StorageLevel.DISK_ONLY)
> scala> rdd3.collect()
> scala> rdd2.toDebugString
> res18: String = 
> (100) MapPartitionsRDD[1] at map at <console>:23 [Disk Serialized 1x Replicated]
>   \|   ParallelCollectionRDD[0] at parallelize at <console>:21 [Disk Serialized 1x Replicated]
> scala> rdd3.toDebugString
> res19: String = 
> (100) ShuffledRDD[2] at reduceByKey at <console>:25 []
>   +-(100) MapPartitionsRDD[1] at map at <console>:23 []
>       \|   ParallelCollectionRDD[0] at parallelize at <console>:21 []
> // 2) The data is not visible on disk though the find command previously mentioned, and is also not mentioned in the toDebugString (no line starting with CachedPartitions, even though  [Disk Serialized 1x Replicated] is mentioned). It does work when you call the action on the actual RDD:
> scala> rdd2.collect()
> scala> rdd2.toDebugString
> res21: String = 
> (100) MapPartitionsRDD[1] at map at <console>:23 [Disk Serialized 1x Replicated]
>   \|        CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; DiskSize: 802.0 B
>   \|   ParallelCollectionRDD[0] at parallelize at <console>:21 [Disk Serialized 1x Replicated]
> scala> rdd3.toDebugString
> res22: String = 
> (100) ShuffledRDD[2] at reduceByKey at <console>:25 []
>   +-(100) MapPartitionsRDD[1] at map at <console>:23 []
>       \|       CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; DiskSize: 802.0 B
>       \|   ParallelCollectionRDD[0] at parallelize at <console>:21 []
> // Data appears on disk again (using find command preciously mentioned), and line with CachedPartitions is back in the .toDebugString
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org