You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by leo9r <le...@gmail.com> on 2017/02/04 09:11:14 UTC

How to checkpoint and RDD after a stage and before reaching an action?

Hi,

I have a 1-action job (saveAsObjectFile at the end), that includes several
stages. One of those stages is an expensive join "rdd1.join(rdd2)". I would
like to checkpoint rdd1 right before the join to improve the stability of
the job. However, what I'm seeing is that the job gets executed all the way
to the end (saveAsObjectFile) without doing any checkpointing, and then
re-runing the computation to checkpoint rdd1 (when I see the files saved to
the checkpoint directory). I have no issue with recomputing, given that I'm
not caching rdd1, but the fact that the checkpointing of rdd1 happens after
the join brings no benefit because the whole DAG is executed in one piece
and the job fails. If that is actually what is happening, what would be the
best approach to solve this? 
What I'm currently doing is to manually save rdd1 to HDFS right after the
filter in line (4) and then load it back right before the join in line (11).
That prevents the job from failing by splitting it into 2 jobs (ie. 2
actions). My expectations was that rdd1.checkpoint in line (8) was going to
have the same effect but without the hassle of manually saving and loading
intermediate files.

///////////////////////////////////////////////

(1)   val rdd1 = loadData1
(2)     .map
(3)     .groupByKey
(4)     .filter
(5)
(6)   val rdd2 = loadData2
(7)
(8)   rdd1.checkpoint()
(9)
(10)  rdd1
(11)    .join(rdd2)
(12)    .saveAsObjectFile(...)

/////////////////////////////////////////////

Thanks in advance,
Leo



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/How-to-checkpoint-and-RDD-after-a-stage-and-before-reaching-an-action-tp20852.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: How to checkpoint and RDD after a stage and before reaching an action?

Posted by Koert Kuipers <ko...@tresata.com>.
this is a general problem with checkpoint, one of the least understood
operations i think.

checkpoint is lazy (meaning it doesnt start until there is an action) and
asynchronous (meaning when it does start it is its own computation). so
basically with a checkpoint the rdd always gets computed twice.

i think the only useful pattern for checkpoint is to always persist/cache
right before the checkpoint. so:
rdd.persist(...).checkpoint()

On Sat, Feb 4, 2017 at 4:11 AM, leo9r <le...@gmail.com> wrote:

> Hi,
>
> I have a 1-action job (saveAsObjectFile at the end), that includes several
> stages. One of those stages is an expensive join "rdd1.join(rdd2)". I would
> like to checkpoint rdd1 right before the join to improve the stability of
> the job. However, what I'm seeing is that the job gets executed all the way
> to the end (saveAsObjectFile) without doing any checkpointing, and then
> re-runing the computation to checkpoint rdd1 (when I see the files saved to
> the checkpoint directory). I have no issue with recomputing, given that I'm
> not caching rdd1, but the fact that the checkpointing of rdd1 happens after
> the join brings no benefit because the whole DAG is executed in one piece
> and the job fails. If that is actually what is happening, what would be the
> best approach to solve this?
> What I'm currently doing is to manually save rdd1 to HDFS right after the
> filter in line (4) and then load it back right before the join in line
> (11).
> That prevents the job from failing by splitting it into 2 jobs (ie. 2
> actions). My expectations was that rdd1.checkpoint in line (8) was going to
> have the same effect but without the hassle of manually saving and loading
> intermediate files.
>
> ///////////////////////////////////////////////
>
> (1)   val rdd1 = loadData1
> (2)     .map
> (3)     .groupByKey
> (4)     .filter
> (5)
> (6)   val rdd2 = loadData2
> (7)
> (8)   rdd1.checkpoint()
> (9)
> (10)  rdd1
> (11)    .join(rdd2)
> (12)    .saveAsObjectFile(...)
>
> /////////////////////////////////////////////
>
> Thanks in advance,
> Leo
>
>
>
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/How-to-checkpoint-
> and-RDD-after-a-stage-and-before-reaching-an-action-tp20852.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>
>

Re: How to checkpoint and RDD after a stage and before reaching an action?

Posted by Liang-Chi Hsieh <vi...@gmail.com>.
Hi Leo,

The checkpointing of a RDD will be performed after a job using this RDD has
completed. Since you have only one job, rdd1 will only be checkpointed after
it is finished.

To checkpoint rdd1, you can simply materialize (and maybe cache it to avoid
recomputation) rdd1 (e.g., rdd1.count) after calling rdd1.checkpoint().



leo9r wrote
> Hi,
> 
> I have a 1-action job (saveAsObjectFile at the end), that includes several
> stages. One of those stages is an expensive join "rdd1.join(rdd2)". I
> would like to checkpoint rdd1 right before the join to improve the
> stability of the job. However, what I'm seeing is that the job gets
> executed all the way to the end (saveAsObjectFile) without doing any
> checkpointing, and then re-runing the computation to checkpoint rdd1 (when
> I see the files saved to the checkpoint directory). I have no issue with
> recomputing, given that I'm not caching rdd1, but the fact that the
> checkpointing of rdd1 happens after the join brings no benefit because the
> whole DAG is executed in one piece and the job fails. If that is actually
> what is happening, what would be the best approach to solve this? 
> What I'm currently doing is to manually save rdd1 to HDFS right after the
> filter in line (4) and then load it back right before the join in line
> (11). That prevents the job from failing by splitting it into 2 jobs (ie.
> 2 actions). My expectations was that rdd1.checkpoint in line (8) was going
> to have the same effect but without the hassle of manually saving and
> loading intermediate files.
> 
> ///////////////////////////////////////////////
> 
> (1)   val rdd1 = loadData1
> (2)     .map
> (3)     .groupByKey
> (4)     .filter
> (5)
> (6)   val rdd2 = loadData2
> (7)
> (8)   rdd1.checkpoint()
> (9)
> (10)  rdd1
> (11)    .join(rdd2)
> (12)    .saveAsObjectFile(...)
> 
> /////////////////////////////////////////////
> 
> Thanks in advance,
> Leo





-----
Liang-Chi Hsieh | @viirya 
Spark Technology Center 
http://www.spark.tc/ 
--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/How-to-checkpoint-and-RDD-after-a-stage-and-before-reaching-an-action-tp20852p20862.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org