You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rinat <r....@cleverdata.ru> on 2017/10/30 15:01:22 UTC

Flink send checkpointing message in IT

Hi guys, I’ve got a question about working with checkpointing. 
I would like to implement IT test, where source is a fixed collection of items and sink performs additional logic, when checkpointing is completed.

I would like to force executing checkpointing, when all messages from my test source were sent and processed by sink.
Please tell me, whether such logic could be performed or not, and how.

Thx !

Re: Flink send checkpointing message in IT

Posted by Chesnay Schepler <ch...@apache.org>.
hmm. While there is /technically/ no guarantee that 
notifyCheckpointComplete is called, it virtually always is,
especially in local setups.

Is it possible for you to share more code (or all of it)? (you can also 
send it to me directly)

On 07.11.2017 11:58, Rinat wrote:
> Yes, but *notifyCheckpointComplete *callback doesn’t called on await 
> completion, I do the same, as in specified test template :
>
>         ActorGateway jobManager = (ActorGateway) 
> Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft());
>         Future<Object> savepointResultFuture = jobManager.ask(new 
> JobManagerMessages.TriggerSavepoint(
>             jobId, Option.<String>empty()), DEADLINE.timeLeft()
>         );
> while(!savepointResultFuture.isCompleted()) {
>             System.out.println();
>         }
>         Object savepointResult = Await.result(savepointResultFuture, 
> DEADLINE.timeLeft());
>
>         if (savepointResult instanceof 
> JobManagerMessages.TriggerSavepointFailure) {
>             throw new RuntimeException(String.format("Something went 
> wrong while executing savepoint, [message=%s]",
> ((JobManagerMessages.TriggerSavepointFailure) savepointResult).cause()
>             ));
>         }
>
> Thx
>
>> On 7 Nov 2017, at 13:54, Chesnay Schepler <chesnay@apache.org 
>> <ma...@apache.org>> wrote:
>>
>> Do you verify that savepointResult is a 
>> JobManagerMessages.TriggerSavepointSuccess? It could also be 
>> JobManagerMessages.TriggerSavepointFailure. (instanceof check)
>>
>> On 02.11.2017 19:11, Rinat wrote:
>>> Chesnay, thanks for your reply, it was very helpful, but I took 
>>> logic from this test template and tried to reuse it in my IT case, 
>>> but found one more issue.
>>> I’ve registered an accumulator in my source function, and for it’s 
>>> value, as specified in the specified example.
>>> When accumulator has an expected value, I perform a savepoint and 
>>> wait for it’s completion using the further code
>>>
>>> ActorGateway jobManager = (ActorGateway) Await.result(cluster.leaderGateway().future(),DEADLINE.timeLeft());
>>> Future<Object> savepointResultFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(
>>>      jobId, Option.<String>empty()),DEADLINE.timeLeft()
>>> );
>>> Object savepointResult = Await.result(savepointResultFuture,DEADLINE.timeLeft());
>>> Afterwards, if failures haven’t been detected I cancels my job and 
>>> shutdowns cluster.
>>>
>>> I found, that checkpoint method *notifyCheckpointComplete* not 
>>> always called, before the *savepointResult* is ready. So the part of 
>>> my logic, that lives in implementation of this method doesn’t work 
>>> and test fails.
>>>
>>> So could you or someone explain, does *Flink* guaranties, that 
>>> *notifyCheckpointComplete* method will be called before 
>>> *savepointResult * will become accessable.
>>> For me, it’s rather strange behaviour and I think that I’m doing 
>>> something wrong.
>>>
>>> Thx.
>>>
>>>> On 1 Nov 2017, at 14:26, Chesnay Schepler <chesnay@apache.org 
>>>> <ma...@apache.org>> wrote:
>>>>
>>>> You could trigger a savepoint, which from the viewpoint of 
>>>> sources/operators/sinks is the same thing as a checkpoint.
>>>>
>>>> How to do this depends a bit on how your test case is written, but 
>>>> you can take a look at the 
>>>> SavepointMigrationTestBase#executeAndSavepoint which is all about 
>>>> running josb and triggering
>>>> savepoints once certain conditions have been met.
>>>>
>>>> On 30.10.2017 16:01, Rinat wrote:
>>>>> Hi guys, I’ve got a question about working with checkpointing.
>>>>> I would like to implement IT test, where source is a fixed 
>>>>> collection of items and sink performs additional logic, when 
>>>>> checkpointing is completed.
>>>>>
>>>>> I would like to force executing checkpointing, when all messages 
>>>>> from my test source were sent and processed by sink.
>>>>> Please tell me, whether such logic could be performed or not, and how.
>>>>>
>>>>> Thx !
>>>>
>>>>
>>>
>>
>


Re: Flink send checkpointing message in IT

Posted by Rinat <r....@cleverdata.ru>.
Yes, but  notifyCheckpointComplete callback doesn’t called on await completion, I do the same, as in specified test template :

        ActorGateway jobManager = (ActorGateway) Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft());
        Future<Object> savepointResultFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(
            jobId, Option.<String>empty()), DEADLINE.timeLeft()
        );
        while(!savepointResultFuture.isCompleted()) {
            System.out.println();
        }
        Object savepointResult = Await.result(savepointResultFuture, DEADLINE.timeLeft());

        if (savepointResult instanceof JobManagerMessages.TriggerSavepointFailure) {
            throw new RuntimeException(String.format("Something went wrong while executing savepoint, [message=%s]",
                ((JobManagerMessages.TriggerSavepointFailure) savepointResult).cause()
            ));
        }

Thx

> On 7 Nov 2017, at 13:54, Chesnay Schepler <ch...@apache.org> wrote:
> 
> Do you verify that savepointResult is a JobManagerMessages.TriggerSavepointSuccess? It could also be JobManagerMessages.TriggerSavepointFailure. (instanceof check)
> 
> On 02.11.2017 19:11, Rinat wrote:
>> Chesnay, thanks for your reply, it was very helpful, but I took logic from this test template and tried to reuse it in my IT case, but found one more issue.
>> I’ve registered an accumulator in my source function, and for it’s value, as specified in the specified example.
>> When accumulator has an expected value, I perform a savepoint and wait for it’s completion using the further code
>> 
>> ActorGateway jobManager = (ActorGateway) Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft());
>> Future<Object> savepointResultFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(
>>     jobId, Option.<String>empty()), DEADLINE.timeLeft()
>> );
>> Object savepointResult = Await.result(savepointResultFuture, DEADLINE.timeLeft());
>> Afterwards, if failures haven’t been detected I cancels my job and shutdowns cluster.
>> 
>> I found, that checkpoint method notifyCheckpointComplete not always called, before the savepointResult is ready. So the part of my logic, that lives in implementation of this method doesn’t work and test fails.
>> 
>> So could you or someone explain, does Flink guaranties, that notifyCheckpointComplete method will be called before savepointResult  will become accessable.
>> For me, it’s rather strange behaviour and I think that I’m doing something wrong.
>> 
>> Thx.
>> 
>>> On 1 Nov 2017, at 14:26, Chesnay Schepler <chesnay@apache.org <ma...@apache.org>> wrote:
>>> 
>>> You could trigger a savepoint, which from the viewpoint of sources/operators/sinks is the same thing as a checkpoint.
>>> 
>>> How to do this depends a bit on how your test case is written, but you can take a look at the SavepointMigrationTestBase#executeAndSavepoint which is all about running josb and triggering
>>> savepoints once certain conditions have been met.
>>> 
>>> On 30.10.2017 16:01, Rinat wrote:
>>>> Hi guys, I’ve got a question about working with checkpointing.
>>>> I would like to implement IT test, where source is a fixed collection of items and sink performs additional logic, when checkpointing is completed.
>>>> 
>>>> I would like to force executing checkpointing, when all messages from my test source were sent and processed by sink.
>>>> Please tell me, whether such logic could be performed or not, and how.
>>>> 
>>>> Thx !
>>> 
>>> 
>> 
> 


Re: Flink send checkpointing message in IT

Posted by Chesnay Schepler <ch...@apache.org>.
Do you verify that savepointResult is a 
JobManagerMessages.TriggerSavepointSuccess? It could also be 
JobManagerMessages.TriggerSavepointFailure. (instanceof check)

On 02.11.2017 19:11, Rinat wrote:
> Chesnay, thanks for your reply, it was very helpful, but I took logic 
> from this test template and tried to reuse it in my IT case, but found 
> one more issue.
> I’ve registered an accumulator in my source function, and for it’s 
> value, as specified in the specified example.
> When accumulator has an expected value, I perform a savepoint and wait 
> for it’s completion using the further code
>
> ActorGateway jobManager = (ActorGateway) Await.result(cluster.leaderGateway().future(),DEADLINE.timeLeft());
> Future<Object> savepointResultFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(
>      jobId, Option.<String>empty()),DEADLINE.timeLeft()
> );
> Object savepointResult = Await.result(savepointResultFuture,DEADLINE.timeLeft());
> Afterwards, if failures haven’t been detected I cancels my job and 
> shutdowns cluster.
>
> I found, that checkpoint method *notifyCheckpointComplete* not always 
> called, before the *savepointResult* is ready. So the part of my 
> logic, that lives in implementation of this method doesn’t work and 
> test fails.
>
> So could you or someone explain, does *Flink* guaranties, that 
> *notifyCheckpointComplete* method will be called before 
> *savepointResult * will become accessable.
> For me, it’s rather strange behaviour and I think that I’m doing 
> something wrong.
>
> Thx.
>
>> On 1 Nov 2017, at 14:26, Chesnay Schepler <chesnay@apache.org 
>> <ma...@apache.org>> wrote:
>>
>> You could trigger a savepoint, which from the viewpoint of 
>> sources/operators/sinks is the same thing as a checkpoint.
>>
>> How to do this depends a bit on how your test case is written, but 
>> you can take a look at the 
>> SavepointMigrationTestBase#executeAndSavepoint which is all about 
>> running josb and triggering
>> savepoints once certain conditions have been met.
>>
>> On 30.10.2017 16:01, Rinat wrote:
>>> Hi guys, I’ve got a question about working with checkpointing.
>>> I would like to implement IT test, where source is a fixed 
>>> collection of items and sink performs additional logic, when 
>>> checkpointing is completed.
>>>
>>> I would like to force executing checkpointing, when all messages 
>>> from my test source were sent and processed by sink.
>>> Please tell me, whether such logic could be performed or not, and how.
>>>
>>> Thx !
>>
>>
>


Re: Flink send checkpointing message in IT

Posted by Rinat <r....@cleverdata.ru>.
Chesnay, thanks for your reply, it was very helpful, but I took logic from this test template and tried to reuse it in my IT case, but found one more issue.
I’ve registered an accumulator in my source function, and for it’s value, as specified in the specified example.
When accumulator has an expected value, I perform a savepoint and wait for it’s completion using the further code

ActorGateway jobManager = (ActorGateway) Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft());
Future<Object> savepointResultFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(
    jobId, Option.<String>empty()), DEADLINE.timeLeft()
);
Object savepointResult = Await.result(savepointResultFuture, DEADLINE.timeLeft());
Afterwards, if failures haven’t been detected I cancels my job and shutdowns cluster.

I found, that checkpoint method notifyCheckpointComplete not always called, before the savepointResult is ready. So the part of my logic, that lives in implementation of this method doesn’t work and test fails.

So could you or someone explain, does Flink guaranties, that notifyCheckpointComplete method will be called before savepointResult  will become accessable.
For me, it’s rather strange behaviour and I think that I’m doing something wrong.

Thx.

> On 1 Nov 2017, at 14:26, Chesnay Schepler <ch...@apache.org> wrote:
> 
> You could trigger a savepoint, which from the viewpoint of sources/operators/sinks is the same thing as a checkpoint.
> 
> How to do this depends a bit on how your test case is written, but you can take a look at the SavepointMigrationTestBase#executeAndSavepoint which is all about running josb and triggering
> savepoints once certain conditions have been met.
> 
> On 30.10.2017 16:01, Rinat wrote:
>> Hi guys, I’ve got a question about working with checkpointing.
>> I would like to implement IT test, where source is a fixed collection of items and sink performs additional logic, when checkpointing is completed.
>> 
>> I would like to force executing checkpointing, when all messages from my test source were sent and processed by sink.
>> Please tell me, whether such logic could be performed or not, and how.
>> 
>> Thx !
> 
> 


Re: Flink send checkpointing message in IT

Posted by Chesnay Schepler <ch...@apache.org>.
You could trigger a savepoint, which from the viewpoint of 
sources/operators/sinks is the same thing as a checkpoint.

How to do this depends a bit on how your test case is written, but you 
can take a look at the SavepointMigrationTestBase#executeAndSavepoint 
which is all about running josb and triggering
savepoints once certain conditions have been met.

On 30.10.2017 16:01, Rinat wrote:
> Hi guys, I’ve got a question about working with checkpointing.
> I would like to implement IT test, where source is a fixed collection of items and sink performs additional logic, when checkpointing is completed.
>
> I would like to force executing checkpointing, when all messages from my test source were sent and processed by sink.
> Please tell me, whether such logic could be performed or not, and how.
>
> Thx !