You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Marchant, Hayden " <ha...@citi.com> on 2017/09/13 13:30:28 UTC

Testing recoverable job state

I'm a newbie to Flink and am trying to understand how the recovery works using state backends. I've read the documentation and am now trying to run a simple test to demonstrate the abilities - I'd like to test the recovery of a flink job and how the state is recovered from where it left off when 'disaster hit'. 

Please note that this whole test is being done on a Windows workstation through my IDE. I am running a LocalFlinkMiniCluster and have enabled checkpointing using FsStateBackend. I am using Kafka as a source. When running this Flink job, I see that a new directory is created within the FsStateBackend base directory with a randomly generated JobID. I assume that if a Task fails within the job, the state stored in the backend will be used to restart the relevant Operator instances from the recent checkpoint. I have tried simulating this by throwing an exception in one of the operators,  though I'm not sure what the expected functionality is now - will the Task be killed, or just that 'bad' tuple will be ignored?

Also, and more importantly, I would like to simulate a more 'drastic' failure - that of my whole Flink cluster going down. In my test I would do this simply by killing my single LocalFlinkMiniCluster process.  In that case, I would like my job to resume when I restart the Flink cluster. However, when I do that, my could launches a new job, with same code, but running with a new Job ID. How do I get it to run with the same Job ID so that it can use the stored state to recover?

Am I approaching this test in the right way? If not, please give me some pointers to better simulate a real system. (Note that in a real system, we would like to run on a single node cluster.)

Thanks,
Hayden Marchant



Re: Testing recoverable job state

Posted by Robert Metzger <rm...@apache.org>.
Hi,

1. Question: When you are throwing an exception within your user code,
Flink will cancel the execution of all tasks and schedule them again (if
you've configured a restart strategy).

2. Question: You'll need to configure the MiniCluster in HA mode. I believe
that should be possible by passing the HA configuration parameters into the
MiniCluster. But it requires you to have a Zookeeper available somewhere.
Running Flink like this in a single JVM is definitively possible. You might
even consider running these single JVMs on two separate machines
concurrently. With HA properly configured, the "standby" one will take over.


Regards,
Robert



On Wed, Sep 13, 2017 at 3:30 PM, Marchant, Hayden <ha...@citi.com>
wrote:

> I'm a newbie to Flink and am trying to understand how the recovery works
> using state backends. I've read the documentation and am now trying to run
> a simple test to demonstrate the abilities - I'd like to test the recovery
> of a flink job and how the state is recovered from where it left off when
> 'disaster hit'.
>
> Please note that this whole test is being done on a Windows workstation
> through my IDE. I am running a LocalFlinkMiniCluster and have enabled
> checkpointing using FsStateBackend. I am using Kafka as a source. When
> running this Flink job, I see that a new directory is created within the
> FsStateBackend base directory with a randomly generated JobID. I assume
> that if a Task fails within the job, the state stored in the backend will
> be used to restart the relevant Operator instances from the recent
> checkpoint. I have tried simulating this by throwing an exception in one of
> the operators,  though I'm not sure what the expected functionality is now
> - will the Task be killed, or just that 'bad' tuple will be ignored?
>
> Also, and more importantly, I would like to simulate a more 'drastic'
> failure - that of my whole Flink cluster going down. In my test I would do
> this simply by killing my single LocalFlinkMiniCluster process.  In that
> case, I would like my job to resume when I restart the Flink cluster.
> However, when I do that, my could launches a new job, with same code, but
> running with a new Job ID. How do I get it to run with the same Job ID so
> that it can use the stored state to recover?
>
> Am I approaching this test in the right way? If not, please give me some
> pointers to better simulate a real system. (Note that in a real system, we
> would like to run on a single node cluster.)
>
> Thanks,
> Hayden Marchant
>
>
>