You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ivan Petrov <ca...@gmail.com> on 2020/08/19 12:38:56 UTC

RDD which was checkpointed is not checkpointed

Hi!
Seems like I do smth wrong. I call .checkpoint() on RDD, but it's not
checkpointed...
What do I do wrong?

val recordsRDD = convertToRecords(anotherRDD)
recordsRDD.checkpoint()
logger.info("checkpoint done")

logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed},
getCheckpointFile: ${recordsRDD.getCheckpointFile}")
logger.info(s"recordsRDD.toDebugString: \n${recordsRDD.toDebugString}")

Output:
Job$ - checkpoint done (!!!)

But then.....
Job$ - isCheckpointed? false, getCheckpointFile: None
Job$ - recordsRDD.toDebugString:
(2) MapPartitionsRDD[7] at map at  Job.scala:112 []
 |  MapPartitionsRDD[6] at map at  Job.scala:111 []
 |  MapPartitionsRDD[5] at map at ....scala:40 []
 |  ShuffledRDD[4] at reduceByKey at ....scala:31 []
 +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
    |  MapPartitionsRDD[2] at map at ...:66 []

Re: RDD which was checkpointed is not checkpointed

Posted by Ivan Petrov <ca...@gmail.com>.
Awesome, thanks for explaining it.

ср, 19 авг. 2020 г. в 16:29, Russell Spitzer <ru...@gmail.com>:

> It determines whether it can use the checkpoint at runtime, so you'll be
> able to see it in the UI but not in the plan since you are looking at the
> plan
> before the job is actually running when it checks to see if it can use the
> checkpoint in the lineage.
>
> Here is a two stage job for example:
>
> *scala> val x = sc.parallelize(Seq("foo","bar"))*
> *x: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at
> parallelize at <console>:24*
>
> *scala> val y = x.repartition(3)*
> *y: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at repartition
> at <console>:25*
>
> *scala> y.checkpoint*
>
> *scala> y.count*
> *res12: Long = 2*
>
> [image: image.png]
>
> [image: image.png]
>
> *scala> y.count*
> *res13: Long = 2*
>
> [image: image.png]
>
> Notice that we were able to skip the first stage because when Stage 11
> looked for it's dependencies it
> found a checkpointed version of the partitioned data so it didn't need to
> repartition again. This makes my
> 2 Stage job into a 2 Stage job with 1 stage skipped or a 1 stage job.
>
>
>
> On Wed, Aug 19, 2020 at 9:07 AM Ivan Petrov <ca...@gmail.com> wrote:
>
>> i did it and see lineage change
>>
>> BEFORE calling action. No success.
>>
>> Job$ - isCheckpointed? false, getCheckpointFile: None
>> Job$ - recordsRDD.toDebugString:
>> (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>>  |  MapPartitionsRDD[6] at map at  Job.scala:111 []
>>  |  MapPartitionsRDD[5] at map at ....scala:40 []
>>  |  ShuffledRDD[4] at reduceByKey at ....scala:31 []
>>  +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
>>     |  MapPartitionsRDD[2] at map at ...:66 []
>>
>> AFTER calling action. nice, it works!
>> Job$ - isCheckpointed? true, getCheckpointFile: Some(path)
>>     Job$ - recordsRDD.toDebugString:
>>     (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>>
>> Lineage now contains only one stage but I want to get rid of it too. This
>> stage happens right before the checkpoint. Will Spark try to re-run it in
>> case task failure AFTER checkpoint?
>> My expectation is that spark will read directly from checkpoint dir, It
>> doesn't have to do anything with previous MapPartitionsRDD[7] at map at
>>  Job.scala:112
>>
>> ср, 19 авг. 2020 г. в 16:01, Russell Spitzer <ru...@gmail.com>:
>>
>>> Checkpoint is lazy and needs an action to actually do the work. The
>>> method just marks the rdd as noted in the doc you posted.
>>>
>>> Call an action twice. The second run should use the checkpoint.
>>>
>>>
>>>
>>> On Wed, Aug 19, 2020, 8:49 AM Ivan Petrov <ca...@gmail.com> wrote:
>>>
>>>> i think it returns Unit... it won't work
>>>> [image: image.png]
>>>>
>>>> I found another way to make it work. Called action after checkpoint
>>>> val recordsRDD = convertToRecords(anotherRDD)
>>>>     recordsRDD.checkpoint()
>>>>     logger.info("checkpoint done")
>>>>     recordsRDD.count() // (!!!)
>>>>     logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed},
>>>> getCheckpointFile: ${recordsRDD.getCheckpointFile}")
>>>>     logger.info(s"recordsRDD.toDebugString:
>>>> \n${recordsRDD.toDebugString}")
>>>>
>>>>     Output:
>>>>     Job$ - checkpoint done (!!!)
>>>>
>>>>     Job$ - isCheckpointed? true, getCheckpointFile: Some(path)
>>>>     Job$ - recordsRDD.toDebugString:
>>>>     (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>>>>
>>>> But still it has single MapPartitionsRDD in lineage. Lineage became
>>>> shorter. But i don't want Spark to rebuild RDD from MapPartitionsRDD, i
>>>> want it to take data directly from checkpoint dir.
>>>> MapPartitionsRDD has non-idempotent id generation. i don't want to call
>>>> it twice in case of downstream task failure
>>>>
>>>>
>>>>
>>>>
>>>> ср, 19 авг. 2020 г. в 14:47, Jacob Lynn <ab...@gmail.com>:
>>>>
>>>>> Hi Ivan,
>>>>>
>>>>> Unlike cache/persist, checkpoint does not operate in-place but
>>>>> requires the result to be assigned to a new variable. In your case:
>>>>>
>>>>> val recordsRDD = convertToRecords(anotherRDD).checkpoint()
>>>>>
>>>>> Best,
>>>>> Jacob
>>>>>
>>>>> Op wo 19 aug. 2020 om 14:39 schreef Ivan Petrov <capacytron@gmail.com
>>>>> >:
>>>>>
>>>>>> Hi!
>>>>>> Seems like I do smth wrong. I call .checkpoint() on RDD, but it's not
>>>>>> checkpointed...
>>>>>> What do I do wrong?
>>>>>>
>>>>>> val recordsRDD = convertToRecords(anotherRDD)
>>>>>> recordsRDD.checkpoint()
>>>>>> logger.info("checkpoint done")
>>>>>>
>>>>>> logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed},
>>>>>> getCheckpointFile: ${recordsRDD.getCheckpointFile}")
>>>>>> logger.info(s"recordsRDD.toDebugString:
>>>>>> \n${recordsRDD.toDebugString}")
>>>>>>
>>>>>> Output:
>>>>>> Job$ - checkpoint done (!!!)
>>>>>>
>>>>>> But then.....
>>>>>> Job$ - isCheckpointed? false, getCheckpointFile: None
>>>>>> Job$ - recordsRDD.toDebugString:
>>>>>> (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>>>>>>  |  MapPartitionsRDD[6] at map at  Job.scala:111 []
>>>>>>  |  MapPartitionsRDD[5] at map at ....scala:40 []
>>>>>>  |  ShuffledRDD[4] at reduceByKey at ....scala:31 []
>>>>>>  +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
>>>>>>     |  MapPartitionsRDD[2] at map at ...:66 []
>>>>>>
>>>>>

Re: RDD which was checkpointed is not checkpointed

Posted by Russell Spitzer <ru...@gmail.com>.
It determines whether it can use the checkpoint at runtime, so you'll be
able to see it in the UI but not in the plan since you are looking at the
plan
before the job is actually running when it checks to see if it can use the
checkpoint in the lineage.

Here is a two stage job for example:

*scala> val x = sc.parallelize(Seq("foo","bar"))*
*x: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at
parallelize at <console>:24*

*scala> val y = x.repartition(3)*
*y: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at repartition
at <console>:25*

*scala> y.checkpoint*

*scala> y.count*
*res12: Long = 2*

[image: image.png]

[image: image.png]

*scala> y.count*
*res13: Long = 2*

[image: image.png]

Notice that we were able to skip the first stage because when Stage 11
looked for it's dependencies it
found a checkpointed version of the partitioned data so it didn't need to
repartition again. This makes my
2 Stage job into a 2 Stage job with 1 stage skipped or a 1 stage job.



On Wed, Aug 19, 2020 at 9:07 AM Ivan Petrov <ca...@gmail.com> wrote:

> i did it and see lineage change
>
> BEFORE calling action. No success.
>
> Job$ - isCheckpointed? false, getCheckpointFile: None
> Job$ - recordsRDD.toDebugString:
> (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>  |  MapPartitionsRDD[6] at map at  Job.scala:111 []
>  |  MapPartitionsRDD[5] at map at ....scala:40 []
>  |  ShuffledRDD[4] at reduceByKey at ....scala:31 []
>  +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
>     |  MapPartitionsRDD[2] at map at ...:66 []
>
> AFTER calling action. nice, it works!
> Job$ - isCheckpointed? true, getCheckpointFile: Some(path)
>     Job$ - recordsRDD.toDebugString:
>     (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>
> Lineage now contains only one stage but I want to get rid of it too. This
> stage happens right before the checkpoint. Will Spark try to re-run it in
> case task failure AFTER checkpoint?
> My expectation is that spark will read directly from checkpoint dir, It
> doesn't have to do anything with previous MapPartitionsRDD[7] at map at
>  Job.scala:112
>
> ср, 19 авг. 2020 г. в 16:01, Russell Spitzer <ru...@gmail.com>:
>
>> Checkpoint is lazy and needs an action to actually do the work. The
>> method just marks the rdd as noted in the doc you posted.
>>
>> Call an action twice. The second run should use the checkpoint.
>>
>>
>>
>> On Wed, Aug 19, 2020, 8:49 AM Ivan Petrov <ca...@gmail.com> wrote:
>>
>>> i think it returns Unit... it won't work
>>> [image: image.png]
>>>
>>> I found another way to make it work. Called action after checkpoint
>>> val recordsRDD = convertToRecords(anotherRDD)
>>>     recordsRDD.checkpoint()
>>>     logger.info("checkpoint done")
>>>     recordsRDD.count() // (!!!)
>>>     logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed},
>>> getCheckpointFile: ${recordsRDD.getCheckpointFile}")
>>>     logger.info(s"recordsRDD.toDebugString:
>>> \n${recordsRDD.toDebugString}")
>>>
>>>     Output:
>>>     Job$ - checkpoint done (!!!)
>>>
>>>     Job$ - isCheckpointed? true, getCheckpointFile: Some(path)
>>>     Job$ - recordsRDD.toDebugString:
>>>     (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>>>
>>> But still it has single MapPartitionsRDD in lineage. Lineage became
>>> shorter. But i don't want Spark to rebuild RDD from MapPartitionsRDD, i
>>> want it to take data directly from checkpoint dir.
>>> MapPartitionsRDD has non-idempotent id generation. i don't want to call
>>> it twice in case of downstream task failure
>>>
>>>
>>>
>>>
>>> ср, 19 авг. 2020 г. в 14:47, Jacob Lynn <ab...@gmail.com>:
>>>
>>>> Hi Ivan,
>>>>
>>>> Unlike cache/persist, checkpoint does not operate in-place but requires
>>>> the result to be assigned to a new variable. In your case:
>>>>
>>>> val recordsRDD = convertToRecords(anotherRDD).checkpoint()
>>>>
>>>> Best,
>>>> Jacob
>>>>
>>>> Op wo 19 aug. 2020 om 14:39 schreef Ivan Petrov <ca...@gmail.com>:
>>>>
>>>>> Hi!
>>>>> Seems like I do smth wrong. I call .checkpoint() on RDD, but it's not
>>>>> checkpointed...
>>>>> What do I do wrong?
>>>>>
>>>>> val recordsRDD = convertToRecords(anotherRDD)
>>>>> recordsRDD.checkpoint()
>>>>> logger.info("checkpoint done")
>>>>>
>>>>> logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed},
>>>>> getCheckpointFile: ${recordsRDD.getCheckpointFile}")
>>>>> logger.info(s"recordsRDD.toDebugString:
>>>>> \n${recordsRDD.toDebugString}")
>>>>>
>>>>> Output:
>>>>> Job$ - checkpoint done (!!!)
>>>>>
>>>>> But then.....
>>>>> Job$ - isCheckpointed? false, getCheckpointFile: None
>>>>> Job$ - recordsRDD.toDebugString:
>>>>> (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>>>>>  |  MapPartitionsRDD[6] at map at  Job.scala:111 []
>>>>>  |  MapPartitionsRDD[5] at map at ....scala:40 []
>>>>>  |  ShuffledRDD[4] at reduceByKey at ....scala:31 []
>>>>>  +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
>>>>>     |  MapPartitionsRDD[2] at map at ...:66 []
>>>>>
>>>>

Re: RDD which was checkpointed is not checkpointed

Posted by Ivan Petrov <ca...@gmail.com>.
i did it and see lineage change

BEFORE calling action. No success.

Job$ - isCheckpointed? false, getCheckpointFile: None
Job$ - recordsRDD.toDebugString:
(2) MapPartitionsRDD[7] at map at  Job.scala:112 []
 |  MapPartitionsRDD[6] at map at  Job.scala:111 []
 |  MapPartitionsRDD[5] at map at ....scala:40 []
 |  ShuffledRDD[4] at reduceByKey at ....scala:31 []
 +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
    |  MapPartitionsRDD[2] at map at ...:66 []

AFTER calling action. nice, it works!
Job$ - isCheckpointed? true, getCheckpointFile: Some(path)
    Job$ - recordsRDD.toDebugString:
    (2) MapPartitionsRDD[7] at map at  Job.scala:112 []

Lineage now contains only one stage but I want to get rid of it too. This
stage happens right before the checkpoint. Will Spark try to re-run it in
case task failure AFTER checkpoint?
My expectation is that spark will read directly from checkpoint dir, It
doesn't have to do anything with previous MapPartitionsRDD[7] at map at
 Job.scala:112

ср, 19 авг. 2020 г. в 16:01, Russell Spitzer <ru...@gmail.com>:

> Checkpoint is lazy and needs an action to actually do the work. The method
> just marks the rdd as noted in the doc you posted.
>
> Call an action twice. The second run should use the checkpoint.
>
>
>
> On Wed, Aug 19, 2020, 8:49 AM Ivan Petrov <ca...@gmail.com> wrote:
>
>> i think it returns Unit... it won't work
>> [image: image.png]
>>
>> I found another way to make it work. Called action after checkpoint
>> val recordsRDD = convertToRecords(anotherRDD)
>>     recordsRDD.checkpoint()
>>     logger.info("checkpoint done")
>>     recordsRDD.count() // (!!!)
>>     logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed},
>> getCheckpointFile: ${recordsRDD.getCheckpointFile}")
>>     logger.info(s"recordsRDD.toDebugString:
>> \n${recordsRDD.toDebugString}")
>>
>>     Output:
>>     Job$ - checkpoint done (!!!)
>>
>>     Job$ - isCheckpointed? true, getCheckpointFile: Some(path)
>>     Job$ - recordsRDD.toDebugString:
>>     (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>>
>> But still it has single MapPartitionsRDD in lineage. Lineage became
>> shorter. But i don't want Spark to rebuild RDD from MapPartitionsRDD, i
>> want it to take data directly from checkpoint dir.
>> MapPartitionsRDD has non-idempotent id generation. i don't want to call
>> it twice in case of downstream task failure
>>
>>
>>
>>
>> ср, 19 авг. 2020 г. в 14:47, Jacob Lynn <ab...@gmail.com>:
>>
>>> Hi Ivan,
>>>
>>> Unlike cache/persist, checkpoint does not operate in-place but requires
>>> the result to be assigned to a new variable. In your case:
>>>
>>> val recordsRDD = convertToRecords(anotherRDD).checkpoint()
>>>
>>> Best,
>>> Jacob
>>>
>>> Op wo 19 aug. 2020 om 14:39 schreef Ivan Petrov <ca...@gmail.com>:
>>>
>>>> Hi!
>>>> Seems like I do smth wrong. I call .checkpoint() on RDD, but it's not
>>>> checkpointed...
>>>> What do I do wrong?
>>>>
>>>> val recordsRDD = convertToRecords(anotherRDD)
>>>> recordsRDD.checkpoint()
>>>> logger.info("checkpoint done")
>>>>
>>>> logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed},
>>>> getCheckpointFile: ${recordsRDD.getCheckpointFile}")
>>>> logger.info(s"recordsRDD.toDebugString: \n${recordsRDD.toDebugString}")
>>>>
>>>> Output:
>>>> Job$ - checkpoint done (!!!)
>>>>
>>>> But then.....
>>>> Job$ - isCheckpointed? false, getCheckpointFile: None
>>>> Job$ - recordsRDD.toDebugString:
>>>> (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>>>>  |  MapPartitionsRDD[6] at map at  Job.scala:111 []
>>>>  |  MapPartitionsRDD[5] at map at ....scala:40 []
>>>>  |  ShuffledRDD[4] at reduceByKey at ....scala:31 []
>>>>  +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
>>>>     |  MapPartitionsRDD[2] at map at ...:66 []
>>>>
>>>

Re: RDD which was checkpointed is not checkpointed

Posted by Russell Spitzer <ru...@gmail.com>.
Checkpoint is lazy and needs an action to actually do the work. The method
just marks the rdd as noted in the doc you posted.

Call an action twice. The second run should use the checkpoint.



On Wed, Aug 19, 2020, 8:49 AM Ivan Petrov <ca...@gmail.com> wrote:

> i think it returns Unit... it won't work
> [image: image.png]
>
> I found another way to make it work. Called action after checkpoint
> val recordsRDD = convertToRecords(anotherRDD)
>     recordsRDD.checkpoint()
>     logger.info("checkpoint done")
>     recordsRDD.count() // (!!!)
>     logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed},
> getCheckpointFile: ${recordsRDD.getCheckpointFile}")
>     logger.info(s"recordsRDD.toDebugString:
> \n${recordsRDD.toDebugString}")
>
>     Output:
>     Job$ - checkpoint done (!!!)
>
>     Job$ - isCheckpointed? true, getCheckpointFile: Some(path)
>     Job$ - recordsRDD.toDebugString:
>     (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>
> But still it has single MapPartitionsRDD in lineage. Lineage became
> shorter. But i don't want Spark to rebuild RDD from MapPartitionsRDD, i
> want it to take data directly from checkpoint dir.
> MapPartitionsRDD has non-idempotent id generation. i don't want to call it
> twice in case of downstream task failure
>
>
>
>
> ср, 19 авг. 2020 г. в 14:47, Jacob Lynn <ab...@gmail.com>:
>
>> Hi Ivan,
>>
>> Unlike cache/persist, checkpoint does not operate in-place but requires
>> the result to be assigned to a new variable. In your case:
>>
>> val recordsRDD = convertToRecords(anotherRDD).checkpoint()
>>
>> Best,
>> Jacob
>>
>> Op wo 19 aug. 2020 om 14:39 schreef Ivan Petrov <ca...@gmail.com>:
>>
>>> Hi!
>>> Seems like I do smth wrong. I call .checkpoint() on RDD, but it's not
>>> checkpointed...
>>> What do I do wrong?
>>>
>>> val recordsRDD = convertToRecords(anotherRDD)
>>> recordsRDD.checkpoint()
>>> logger.info("checkpoint done")
>>>
>>> logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed},
>>> getCheckpointFile: ${recordsRDD.getCheckpointFile}")
>>> logger.info(s"recordsRDD.toDebugString: \n${recordsRDD.toDebugString}")
>>>
>>> Output:
>>> Job$ - checkpoint done (!!!)
>>>
>>> But then.....
>>> Job$ - isCheckpointed? false, getCheckpointFile: None
>>> Job$ - recordsRDD.toDebugString:
>>> (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>>>  |  MapPartitionsRDD[6] at map at  Job.scala:111 []
>>>  |  MapPartitionsRDD[5] at map at ....scala:40 []
>>>  |  ShuffledRDD[4] at reduceByKey at ....scala:31 []
>>>  +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
>>>     |  MapPartitionsRDD[2] at map at ...:66 []
>>>
>>

Re: RDD which was checkpointed is not checkpointed

Posted by Jacob Lynn <ab...@gmail.com>.
Oops, you're right. My incorrect answer above applies only to DataFrames
(2.1+), not RDDs.

Op wo 19 aug. 2020 om 15:49 schreef Ivan Petrov <ca...@gmail.com>:

> i think it returns Unit... it won't work
> [image: image.png]
>
> I found another way to make it work. Called action after checkpoint
> val recordsRDD = convertToRecords(anotherRDD)
>     recordsRDD.checkpoint()
>     logger.info("checkpoint done")
>     recordsRDD.count() // (!!!)
>     logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed},
> getCheckpointFile: ${recordsRDD.getCheckpointFile}")
>     logger.info(s"recordsRDD.toDebugString:
> \n${recordsRDD.toDebugString}")
>
>     Output:
>     Job$ - checkpoint done (!!!)
>
>     Job$ - isCheckpointed? true, getCheckpointFile: Some(path)
>     Job$ - recordsRDD.toDebugString:
>     (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>
> But still it has single MapPartitionsRDD in lineage. Lineage became
> shorter. But i don't want Spark to rebuild RDD from MapPartitionsRDD, i
> want it to take data directly from checkpoint dir.
> MapPartitionsRDD has non-idempotent id generation. i don't want to call it
> twice in case of downstream task failure
>
>
>
>
> ср, 19 авг. 2020 г. в 14:47, Jacob Lynn <ab...@gmail.com>:
>
>> Hi Ivan,
>>
>> Unlike cache/persist, checkpoint does not operate in-place but requires
>> the result to be assigned to a new variable. In your case:
>>
>> val recordsRDD = convertToRecords(anotherRDD).checkpoint()
>>
>> Best,
>> Jacob
>>
>> Op wo 19 aug. 2020 om 14:39 schreef Ivan Petrov <ca...@gmail.com>:
>>
>>> Hi!
>>> Seems like I do smth wrong. I call .checkpoint() on RDD, but it's not
>>> checkpointed...
>>> What do I do wrong?
>>>
>>> val recordsRDD = convertToRecords(anotherRDD)
>>> recordsRDD.checkpoint()
>>> logger.info("checkpoint done")
>>>
>>> logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed},
>>> getCheckpointFile: ${recordsRDD.getCheckpointFile}")
>>> logger.info(s"recordsRDD.toDebugString: \n${recordsRDD.toDebugString}")
>>>
>>> Output:
>>> Job$ - checkpoint done (!!!)
>>>
>>> But then.....
>>> Job$ - isCheckpointed? false, getCheckpointFile: None
>>> Job$ - recordsRDD.toDebugString:
>>> (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>>>  |  MapPartitionsRDD[6] at map at  Job.scala:111 []
>>>  |  MapPartitionsRDD[5] at map at ....scala:40 []
>>>  |  ShuffledRDD[4] at reduceByKey at ....scala:31 []
>>>  +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
>>>     |  MapPartitionsRDD[2] at map at ...:66 []
>>>
>>

Re: RDD which was checkpointed is not checkpointed

Posted by Ivan Petrov <ca...@gmail.com>.
i think it returns Unit... it won't work
[image: image.png]

I found another way to make it work. Called action after checkpoint
val recordsRDD = convertToRecords(anotherRDD)
    recordsRDD.checkpoint()
    logger.info("checkpoint done")
    recordsRDD.count() // (!!!)
    logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed},
getCheckpointFile: ${recordsRDD.getCheckpointFile}")
    logger.info(s"recordsRDD.toDebugString: \n${recordsRDD.toDebugString}")

    Output:
    Job$ - checkpoint done (!!!)

    Job$ - isCheckpointed? true, getCheckpointFile: Some(path)
    Job$ - recordsRDD.toDebugString:
    (2) MapPartitionsRDD[7] at map at  Job.scala:112 []

But still it has single MapPartitionsRDD in lineage. Lineage became
shorter. But i don't want Spark to rebuild RDD from MapPartitionsRDD, i
want it to take data directly from checkpoint dir.
MapPartitionsRDD has non-idempotent id generation. i don't want to call it
twice in case of downstream task failure




ср, 19 авг. 2020 г. в 14:47, Jacob Lynn <ab...@gmail.com>:

> Hi Ivan,
>
> Unlike cache/persist, checkpoint does not operate in-place but requires
> the result to be assigned to a new variable. In your case:
>
> val recordsRDD = convertToRecords(anotherRDD).checkpoint()
>
> Best,
> Jacob
>
> Op wo 19 aug. 2020 om 14:39 schreef Ivan Petrov <ca...@gmail.com>:
>
>> Hi!
>> Seems like I do smth wrong. I call .checkpoint() on RDD, but it's not
>> checkpointed...
>> What do I do wrong?
>>
>> val recordsRDD = convertToRecords(anotherRDD)
>> recordsRDD.checkpoint()
>> logger.info("checkpoint done")
>>
>> logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed},
>> getCheckpointFile: ${recordsRDD.getCheckpointFile}")
>> logger.info(s"recordsRDD.toDebugString: \n${recordsRDD.toDebugString}")
>>
>> Output:
>> Job$ - checkpoint done (!!!)
>>
>> But then.....
>> Job$ - isCheckpointed? false, getCheckpointFile: None
>> Job$ - recordsRDD.toDebugString:
>> (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>>  |  MapPartitionsRDD[6] at map at  Job.scala:111 []
>>  |  MapPartitionsRDD[5] at map at ....scala:40 []
>>  |  ShuffledRDD[4] at reduceByKey at ....scala:31 []
>>  +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
>>     |  MapPartitionsRDD[2] at map at ...:66 []
>>
>

Re: RDD which was checkpointed is not checkpointed

Posted by Jacob Lynn <ab...@gmail.com>.
Hi Ivan,

Unlike cache/persist, checkpoint does not operate in-place but requires the
result to be assigned to a new variable. In your case:

val recordsRDD = convertToRecords(anotherRDD).checkpoint()

Best,
Jacob

Op wo 19 aug. 2020 om 14:39 schreef Ivan Petrov <ca...@gmail.com>:

> Hi!
> Seems like I do smth wrong. I call .checkpoint() on RDD, but it's not
> checkpointed...
> What do I do wrong?
>
> val recordsRDD = convertToRecords(anotherRDD)
> recordsRDD.checkpoint()
> logger.info("checkpoint done")
>
> logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed},
> getCheckpointFile: ${recordsRDD.getCheckpointFile}")
> logger.info(s"recordsRDD.toDebugString: \n${recordsRDD.toDebugString}")
>
> Output:
> Job$ - checkpoint done (!!!)
>
> But then.....
> Job$ - isCheckpointed? false, getCheckpointFile: None
> Job$ - recordsRDD.toDebugString:
> (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>  |  MapPartitionsRDD[6] at map at  Job.scala:111 []
>  |  MapPartitionsRDD[5] at map at ....scala:40 []
>  |  ShuffledRDD[4] at reduceByKey at ....scala:31 []
>  +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
>     |  MapPartitionsRDD[2] at map at ...:66 []
>