You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Stephan Ewen <se...@apache.org> on 2018/03/01 17:07:15 UTC

Re: Which test cluster to use for checkpointing tests?

@Nico This has nothing to do with the DataSet API. The DataStream API
supports finite programs as well.

@Ken The issue you are running into is that Checkpointing works currently
only until the job reaches the point where the pipeline starts to drain
out, meaning when the sources are done. In your case, the source is done
immediately, sending out only one tuple.

Running checkpoints with closed sources is something that's on the feature
list and will come soon...


On Wed, Feb 28, 2018 at 4:02 PM, Nico Kruber <ni...@data-artisans.com> wrote:

> I was a bit confused about when you said that the "source is done" which
> is when I realized you must be using the batch API for which
> checkpointing is not available / needed. Let me quote from [1] which
> imho has not changed:
>
> DataSet:
>
> Fault tolerance for the DataSet API works by restarting the job and
> redoing all of the work. [...] The periodic in-flight checkpoints are
> not used here.
>
> DataStream:
>
> This one would start immediately inserting data (as it is a streaming
> job), and draw periodic checkpoints that make sure replay-on-failure
> only has to redo only a bit, not everything.
>
>
> Nico
>
> [1]
> https://lists.apache.org/thread.html/3121ad01f5adf4246aa035dfb886af
> 534b063963dee0f86d63b675a1@1447086324@%3Cuser.flink.apache.org%3E
>
> On 26/02/18 22:55, Ken Krugler wrote:
> > Hi Nico,
> >
> >> On Feb 26, 2018, at 9:41 AM, Nico Kruber <nico@data-artisans.com
> >> <ma...@data-artisans.com>> wrote:
> >>
> >> Hi Ken,
> >> LocalFlinkMiniCluster should run checkpoints just fine. It looks like it
> >> was attempting to even create one but could not finish. Maybe your
> >> program was not fully running yet?
> >
> > In the logs I see:
> >
> > 18/02/23 12:40:50 INFO taskmanager.Task:957 - Source: Seed urls source
> > (1/2) (56fdede2f4783455b4ab8f290e700baa) switched from DEPLOYING to
> RUNNING.
> > 18/02/23 12:40:50 DEBUG tasks.StreamTask:214 - Initializing Source: Seed
> > urls source (1/2).
> > 18/02/23 12:40:50 INFO checkpoint.CheckpointCoordinator:525 - Checkpoint
> > triggering task Source: Seed urls source (1/2) is not being executed at
> > the moment. Aborting checkpoint.
> >
> > Maybe the checkpoint here is happening too soon after the “Initializing
> > Source” message.
> >
> > After that the source is done (it only triggers the iteration with a
> > single starting tuple), so I wouldn’t expect checkpointing to actually
> > do anything. I was just using these messages as indications that I had
> > configured my workflow properly to actually do checkpointing.
> >
> >> Can you tell us a little bit more about your set up and how you
> >> configured the LocalFlinkMiniCluster?
> >
> > Potential issue #1 - I’ve got a workflow with multiple iterations.
> >
> > For that reason I had to force checkpointing via:
> >
> >         env.setStateBackend(new MemoryStateBackend());
> > env.enableCheckpointing(100L, CheckpointingMode.AT_LEAST_ONCE, true);
> >
> >
> > Potential issue #2 - because of the fun with tracking iteration
> > progress, I subclass LocalStreamEnvironment to add this async execution
> > method:
> >
> > public JobSubmissionResult executeAsync(String jobName) throws Exception
> {
> > // transform the streaming program into a JobGraph
> > StreamGraph streamGraph = getStreamGraph();
> > streamGraph.setJobName(jobName);
> >
> > JobGraph jobGraph = streamGraph.getJobGraph();
> >
> > Configuration configuration = new Configuration();
> > configuration.addAll(jobGraph.getJobConfiguration());
> >
> > configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
> > configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
> > jobGraph.getMaximumParallelism());
> >
> > // add (and override) the settings with what the user defined
> > configuration.addAll(_conf);
> >
> > _exec = new LocalFlinkMiniCluster(configuration, true);
> > _exec.start(true);
> >
> >
> > // The above code is all basically the same as Flink's
> > LocalStreamEnvironment.
> > // The change is that here we call submitJobDetached vs.
> submitJobAndWait.
> > // We assume that eventually someone calls stop(job id), which then
> > terminates
> > // the LocalFlinkMinimCluster.
> > return _exec.submitJobDetached(jobGraph);
> > }
> >
> > However I don’t think that would impact checkpointing.
> >
> > Anything else I should do to debug whether checkpointing is operating as
> > expected? In the logs, at DEBUG level, I don’t see any errors or
> > warnings related to this.
> >
> > Thanks,
> >
> > — Ken
> >
> >>
> >>
> >> Nico
> >>
> >> On 23/02/18 21:42, Ken Krugler wrote:
> >>> Hi all,
> >>>
> >>> For testing checkpointing, is it possible to use LocalFlinkMiniCluster?
> >>>
> >>> Asking because I’m not seeing checkpoint calls being made to my
> >>> custom function (implements ListCheckpointed) when I’m running with
> >>> LocalFlinkMiniCluster.
> >>>
> >>> Though I do see entries like this logged:
> >>>
> >>> 18/02/23 12:40:50 INFO jobmanager.JobManager:246 - Using
> >>> application-defined state backend for checkpoint/savepoint metadata:
> >>> MemoryStateBackend (data in heap memory / checkpoints to JobManager).
> >>> 18/02/23 12:40:50 INFO checkpoint.CheckpointCoordinator:525 -
> >>> Checkpoint triggering task Source: Seed urls source (1/2) is not
> >>> being executed at the moment. Aborting checkpoint.
> >>>
> >>> But when I browse the Flink source, tests for checkpointing seem to
> >>> be using TestCluster, e.g. in ResumeCheckpointManuallyITCase
> >>>
> >>> Thanks,
> >>>
> >>> — Ken
> >>>
> >>> --------------------------------------------
> >>> http://about.me/kkrugler
> >>> +1 530-210-6378
> >>>
> >>
> >
> > --------------------------------------------
> > http://about.me/kkrugler
> > +1 530-210-6378
> >
>
>

Re: Which test cluster to use for checkpointing tests?

Posted by Aljoscha Krettek <al...@apache.org>.
Fyi, this is the Jira issue for tracking the issue: https://issues.apache.org/jira/browse/FLINK-2491 <https://issues.apache.org/jira/browse/FLINK-2491>

Aljoscha

> On 6. Mar 2018, at 02:32, Nico Kruber <ni...@data-artisans.com> wrote:
> 
> There are still some upcoming changes for the network stack, but most of
> the heavy stuff it already through - you may track this under
> https://issues.apache.org/jira/browse/FLINK-8581
> 
> FLIP-6 is somewhat similar and currently only undergoes some stability
> improvements/bug fixing. The architectural changes are merged now.
> 
> 
> Nico
> 
> On 06/03/18 11:24, Paris Carbone wrote:
>> Hey,
>> 
>> Indeed checkpointing iterations and dealing with closed sources are orthogonal issues, that is why the latter is not part of FLIP-15. Though, you kinda need both to have meaningful checkpoints for jobs with iterations.
>> One has to do with correctness (checkpointing strongly connected components in the execution graph) and the other about termination (terminating the checkpointing protocol when certain tasks ‘finish’).
>> 
>> I am willing to help out resolving the first issue, though I prefer to wait for ongoing changes in the network model and FLIP-6 to be finalised to apply this change properly (are they?). 
>> 
>> Paris
>> 
>>> On 6 Mar 2018, at 10:51, Nico Kruber <ni...@data-artisans.com> wrote:
>>> 
>>> Hi Ken,
>>> sorry, I was mislead by the fact that you are using iterations and those
>>> were only documented for the DataSet API.
>>> 
>>> Running checkpoints with closed sources sounds like a more general thing
>>> than being part of the iterations rework of FLIP-15. I couldn't dig up
>>> anything on jira regarding this improvement either.
>>> 
>>> @Stephan: is this documented somewhere?
>>> 
>>> 
>>> Nico
>>> 
>>> On 02/03/18 23:55, Ken Krugler wrote:
>>>> Hi Stephan,
>>>> 
>>>> Thanks for the update.
>>>> 
>>>> So is support for “running checkpoints with closed sources” part
>>>> of FLIP-15
>>>> <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66853132>,
>>>> or something separate?
>>>> 
>>>> Regards,
>>>> 
>>>> — Ken
>>>> 
>>>>> On Mar 1, 2018, at 9:07 AM, Stephan Ewen <sewen@apache.org
>>>>> <ma...@apache.org>> wrote:
>>>>> 
>>>>> @Ken The issue you are running into is that Checkpointing works
>>>>> currently only until the job reaches the point where the pipeline
>>>>> starts to drain out, meaning when the sources are done. In your case,
>>>>> the source is done immediately, sending out only one tuple.
>>>>> 
>>>>> Running checkpoints with closed sources is something that's on the
>>>>> feature list and will come soon…
>>>> 
>>>> --------------------------------------------
>>>> http://about.me/kkrugler
>>>> +1 530-210-6378
>>>> 
>>> 
>> 
> 


Re: Which test cluster to use for checkpointing tests?

Posted by Nico Kruber <ni...@data-artisans.com>.
There are still some upcoming changes for the network stack, but most of
the heavy stuff it already through - you may track this under
https://issues.apache.org/jira/browse/FLINK-8581

FLIP-6 is somewhat similar and currently only undergoes some stability
improvements/bug fixing. The architectural changes are merged now.


Nico

On 06/03/18 11:24, Paris Carbone wrote:
> Hey,
> 
> Indeed checkpointing iterations and dealing with closed sources are orthogonal issues, that is why the latter is not part of FLIP-15. Though, you kinda need both to have meaningful checkpoints for jobs with iterations.
> One has to do with correctness (checkpointing strongly connected components in the execution graph) and the other about termination (terminating the checkpointing protocol when certain tasks ‘finish’).
> 
> I am willing to help out resolving the first issue, though I prefer to wait for ongoing changes in the network model and FLIP-6 to be finalised to apply this change properly (are they?). 
> 
> Paris
> 
>> On 6 Mar 2018, at 10:51, Nico Kruber <ni...@data-artisans.com> wrote:
>>
>> Hi Ken,
>> sorry, I was mislead by the fact that you are using iterations and those
>> were only documented for the DataSet API.
>>
>> Running checkpoints with closed sources sounds like a more general thing
>> than being part of the iterations rework of FLIP-15. I couldn't dig up
>> anything on jira regarding this improvement either.
>>
>> @Stephan: is this documented somewhere?
>>
>>
>> Nico
>>
>> On 02/03/18 23:55, Ken Krugler wrote:
>>> Hi Stephan,
>>>
>>> Thanks for the update.
>>>
>>> So is support for “running checkpoints with closed sources” part
>>> of FLIP-15
>>> <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66853132>,
>>> or something separate?
>>>
>>> Regards,
>>>
>>> — Ken
>>>
>>>> On Mar 1, 2018, at 9:07 AM, Stephan Ewen <sewen@apache.org
>>>> <ma...@apache.org>> wrote:
>>>>
>>>> @Ken The issue you are running into is that Checkpointing works
>>>> currently only until the job reaches the point where the pipeline
>>>> starts to drain out, meaning when the sources are done. In your case,
>>>> the source is done immediately, sending out only one tuple.
>>>>
>>>> Running checkpoints with closed sources is something that's on the
>>>> feature list and will come soon…
>>>
>>> --------------------------------------------
>>> http://about.me/kkrugler
>>> +1 530-210-6378
>>>
>>
> 


Re: Which test cluster to use for checkpointing tests?

Posted by Paris Carbone <pa...@kth.se>.
Hey,

Indeed checkpointing iterations and dealing with closed sources are orthogonal issues, that is why the latter is not part of FLIP-15. Though, you kinda need both to have meaningful checkpoints for jobs with iterations.
One has to do with correctness (checkpointing strongly connected components in the execution graph) and the other about termination (terminating the checkpointing protocol when certain tasks ‘finish’).

I am willing to help out resolving the first issue, though I prefer to wait for ongoing changes in the network model and FLIP-6 to be finalised to apply this change properly (are they?). 

Paris

> On 6 Mar 2018, at 10:51, Nico Kruber <ni...@data-artisans.com> wrote:
> 
> Hi Ken,
> sorry, I was mislead by the fact that you are using iterations and those
> were only documented for the DataSet API.
> 
> Running checkpoints with closed sources sounds like a more general thing
> than being part of the iterations rework of FLIP-15. I couldn't dig up
> anything on jira regarding this improvement either.
> 
> @Stephan: is this documented somewhere?
> 
> 
> Nico
> 
> On 02/03/18 23:55, Ken Krugler wrote:
>> Hi Stephan,
>> 
>> Thanks for the update.
>> 
>> So is support for “running checkpoints with closed sources” part
>> of FLIP-15
>> <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66853132>,
>> or something separate?
>> 
>> Regards,
>> 
>> — Ken
>> 
>>> On Mar 1, 2018, at 9:07 AM, Stephan Ewen <sewen@apache.org
>>> <ma...@apache.org>> wrote:
>>> 
>>> @Ken The issue you are running into is that Checkpointing works
>>> currently only until the job reaches the point where the pipeline
>>> starts to drain out, meaning when the sources are done. In your case,
>>> the source is done immediately, sending out only one tuple.
>>> 
>>> Running checkpoints with closed sources is something that's on the
>>> feature list and will come soon…
>> 
>> --------------------------------------------
>> http://about.me/kkrugler
>> +1 530-210-6378
>> 
> 


Re: Which test cluster to use for checkpointing tests?

Posted by Nico Kruber <ni...@data-artisans.com>.
Hi Ken,
sorry, I was mislead by the fact that you are using iterations and those
were only documented for the DataSet API.

Running checkpoints with closed sources sounds like a more general thing
than being part of the iterations rework of FLIP-15. I couldn't dig up
anything on jira regarding this improvement either.

@Stephan: is this documented somewhere?


Nico

On 02/03/18 23:55, Ken Krugler wrote:
> Hi Stephan,
> 
> Thanks for the update.
> 
> So is support for “running checkpoints with closed sources” part
> of FLIP-15
> <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66853132>,
> or something separate?
> 
> Regards,
> 
> — Ken
> 
>> On Mar 1, 2018, at 9:07 AM, Stephan Ewen <sewen@apache.org
>> <ma...@apache.org>> wrote:
>>
>> @Ken The issue you are running into is that Checkpointing works
>> currently only until the job reaches the point where the pipeline
>> starts to drain out, meaning when the sources are done. In your case,
>> the source is done immediately, sending out only one tuple.
>>
>> Running checkpoints with closed sources is something that's on the
>> feature list and will come soon…
> 
> --------------------------------------------
> http://about.me/kkrugler
> +1 530-210-6378
> 


Re: Which test cluster to use for checkpointing tests?

Posted by Ken Krugler <kk...@transpac.com>.
Hi Stephan,

Thanks for the update.

So is support for “running checkpoints with closed sources” part of FLIP-15 <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66853132>, or something separate?

Regards,

— Ken

> On Mar 1, 2018, at 9:07 AM, Stephan Ewen <se...@apache.org> wrote:
> 
> @Ken The issue you are running into is that Checkpointing works currently only until the job reaches the point where the pipeline starts to drain out, meaning when the sources are done. In your case, the source is done immediately, sending out only one tuple.
> 
> Running checkpoints with closed sources is something that's on the feature list and will come soon…

--------------------------------------------
http://about.me/kkrugler
+1 530-210-6378