You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Juho Autio <ju...@rovio.com> on 2018/05/14 11:00:12 UTC

Missing MapState when Timer fires after restored state

We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old
state. After restoring state from a checkpoint, it seems like a timer had
been restored, but not the data that was expected to be in a related
MapState if such timer has been added.

The way I see this is that there's a bug, either of these:
- The writing of timers & map states to Flink state is not synchronized (or
maybe there are no such guarantees by design?)
- Flink may restore a checkpoint that is actually corrupted/incomplete

Our code (simplified):

    private MapState<String, String> mapState;

    public void processElement(..) {
            mapState.put("lastUpdated", ctx.timestamp().toString());
            ctx.timerService().registerEventTimeTimer(ctx.timestamp() +
stateRetentionMillis);
    }

    public void onTimer(long timestamp, OnTimerContext ctx, ..) {
        long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
        if (timestamp >= lastUpdated + stateRetentionMillis) {
            mapState.clear();
        }
    }

Normally this "just works". As you can see, it shouldn't be possible that
"lastUpdated" doesn't exist in state if timer was registered and onTimer
gets called.

However, after restoring state from a checkpoint, the job kept failing with
this error:

Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
..

So apparently onTimer was called but lastUpdated wasn't found in the
MapState.

The background for restoring state in this case is not entirely clean.
There was an OS level issue "Too many open files" after running a job for
~11 days. To fix that, we replaced the cluster with a new one and launched
the Flink job again. State was successfully restored from the latest
checkpoint that had been created by the "problematic execution". Now, I'm
assuming that if the state wouldn't have been created successfully,
restoring wouldn't succeed either – correct? This is just to rule out that
the issue with state didn't happen because the checkpoint files were
somehow corrupted due to the Too many open files problem.

Thank you all for your continued support!

P.S. I would be very much interested to hear if there's some cleaner way to
achieve this kind of TTL for keyed state in Flink.

Re: Missing MapState when Timer fires after restored state

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Juho,

As Sihua said, this shouldn't happen and indicates a bug. Did you only encounter this once or can you easily reproduce the problem?

Best,
Aljoscha

> On 15. May 2018, at 05:57, sihua zhou <su...@163.com> wrote:
> 
> Hi Juho,
> in fact, from your code I can't see any possible that the MapState could be inconsistency with the timer, it's looks like a bug to me, because once the checkpoint's complete and you haven't query the state in a customer thread async, then the result of the checkpoint should be consistency. The only case, I can see where the timer could be inconsistency with state is when the task is shutting down, that case the backend maybe already closed but the timer failed to shutdown, so that the time callback function may access a closed backend. But it shouldn't be reason of your case. Maybe, could you please provide us more information, like what type of backend are you using? are you using the RocksDBBackend? and I think @Stefan may tell more about this, and please correct me if I'm incorrect.
> 
> Best,
> Sihua
> 
> On 05/15/2018 01:48,Bowen Li<bo...@gmail.com> <ma...@gmail.com> wrote: 
> Hi Juho,
> 
> You are right, there's no transactional guarantee on timers and state in processElement(). They may end up with inconsistency if your job was cancelled in the middle of processing an element.
> 
> To avoid the situation, the best programming practice is to always check if the state you're trying to get is null or not.
> 
> I've also created https://issues.apache.org/jira/browse/FLINK-9362 <https://issues.apache.org/jira/browse/FLINK-9362> to document this. 
> 
> Thanks
> Bowen
> 
> 
> 
> On Mon, May 14, 2018 at 4:00 AM, Juho Autio <juho.autio@rovio.com <ma...@rovio.com>> wrote:
> We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old state. After restoring state from a checkpoint, it seems like a timer had been restored, but not the data that was expected to be in a related MapState if such timer has been added.
> 
> The way I see this is that there's a bug, either of these:
> - The writing of timers & map states to Flink state is not synchronized (or maybe there are no such guarantees by design?)
> - Flink may restore a checkpoint that is actually corrupted/incomplete
> 
> Our code (simplified):
> 
>     private MapState<String, String> mapState;
> 
>     public void processElement(..) {
>             mapState.put("lastUpdated", ctx.timestamp().toString());
>             ctx.timerService().registerEventTimeTimer(ctx.timestamp() + stateRetentionMillis);
>     }
> 
>     public void onTimer(long timestamp, OnTimerContext ctx, ..) {
>         long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
>         if (timestamp >= lastUpdated + stateRetentionMillis) {
>             mapState.clear();
>         }
>     }
> 
> Normally this "just works". As you can see, it shouldn't be possible that "lastUpdated" doesn't exist in state if timer was registered and onTimer gets called.
> 
> However, after restoring state from a checkpoint, the job kept failing with this error:
> 
> Caused by: java.lang.NumberFormatException: null
> at java.lang.Long.parseLong(Long.java:552)
> at java.lang.Long.parseLong(Long.java:631)
> at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
> ..
> 
> So apparently onTimer was called but lastUpdated wasn't found in the MapState.
> 
> The background for restoring state in this case is not entirely clean. There was an OS level issue "Too many open files" after running a job for ~11 days. To fix that, we replaced the cluster with a new one and launched the Flink job again. State was successfully restored from the latest checkpoint that had been created by the "problematic execution". Now, I'm assuming that if the state wouldn't have been created successfully, restoring wouldn't succeed either – correct? This is just to rule out that the issue with state didn't happen because the checkpoint files were somehow corrupted due to the Too many open files problem.
> 
> Thank you all for your continued support!
> 
> P.S. I would be very much interested to hear if there's some cleaner way to achieve this kind of TTL for keyed state in Flink.
> 
> 


Re: Missing MapState when Timer fires after restored state

Posted by sihua zhou <su...@163.com>.
Hi Juho,
in fact, from your code I can't see any possible that the MapState could be inconsistency with the timer, it's looks like a bug to me, because once the checkpoint's complete and you haven't query the state in a customer thread async, then the result of the checkpoint should be consistency. The only case, I can see where the timer could be inconsistency with state is when the task is shutting down, that case the backend maybe already closed but the timer failed to shutdown, so that the time callback function may access a closed backend. But it shouldn't be reason of your case. Maybe, could you please provide us more information, like what type of backend are you using? are you using the RocksDBBackend? and I think @Stefan may tell more about this, and please correct me if I'm incorrect.


Best,
Sihua


On 05/15/2018 01:48,Bowen Li<bo...@gmail.com> wrote:
Hi Juho,


You are right, there's no transactional guarantee on timers and state in processElement(). They may end up with inconsistency if your job was cancelled in the middle of processing an element.


To avoid the situation, the best programming practice is to always check if the state you're trying to get is null or not.


I've also created https://issues.apache.org/jira/browse/FLINK-9362 to document this. 


Thanks
Bowen






On Mon, May 14, 2018 at 4:00 AM, Juho Autio <ju...@rovio.com> wrote:

We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old state. After restoring state from a checkpoint, it seems like a timer had been restored, but not the data that was expected to be in a related MapState if such timer has been added.



The way I see this is that there's a bug, either of these:
- The writing of timers & map states to Flink state is not synchronized (or maybe there are no such guarantees by design?)
- Flink may restore a checkpoint that is actually corrupted/incomplete


Our code (simplified):


    private MapState<String, String> mapState;


    public void processElement(..) {
            mapState.put("lastUpdated", ctx.timestamp().toString());
            ctx.timerService().registerEventTimeTimer(ctx.timestamp() + stateRetentionMillis);
    }


    public void onTimer(long timestamp, OnTimerContext ctx, ..) {

        long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
        if (timestamp >= lastUpdated + stateRetentionMillis) {
            mapState.clear();
        }
    }


Normally this "just works". As you can see, it shouldn't be possible that "lastUpdated" doesn't exist in state if timer was registered and onTimer gets called.


However, after restoring state from a checkpoint, the job kept failing with this error:


Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
..


So apparently onTimer was called but lastUpdated wasn't found in the MapState.


The background for restoring state in this case is not entirely clean. There was an OS level issue "Too many open files" after running a job for ~11 days. To fix that, we replaced the cluster with a new one and launched the Flink job again. State was successfully restored from the latest checkpoint that had been created by the "problematic execution". Now, I'm assuming that if the state wouldn't have been created successfully, restoring wouldn't succeed either – correct? This is just to rule out that the issue with state didn't happen because the checkpoint files were somehow corrupted due to the Too many open files problem.



Thank you all for your continued support!


P.S. I would be very much interested to hear if there's some cleaner way to achieve this kind of TTL for keyed state in Flink.


Re: Missing MapState when Timer fires after restored state

Posted by Bowen Li <bo...@gmail.com>.
Hi Juho,

You are right, there's no transactional guarantee on timers and state in
processElement(). They may end up with inconsistency if your job was
cancelled in the middle of processing an element.

To avoid the situation, the best programming practice is to always check if
the state you're trying to get is null or not.

I've also created https://issues.apache.org/jira/browse/FLINK-9362 to
document this.

Thanks
Bowen



On Mon, May 14, 2018 at 4:00 AM, Juho Autio <ju...@rovio.com> wrote:

> We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old
> state. After restoring state from a checkpoint, it seems like a timer had
> been restored, but not the data that was expected to be in a related
> MapState if such timer has been added.
>
> The way I see this is that there's a bug, either of these:
> - The writing of timers & map states to Flink state is not synchronized
> (or maybe there are no such guarantees by design?)
> - Flink may restore a checkpoint that is actually corrupted/incomplete
>
> Our code (simplified):
>
>     private MapState<String, String> mapState;
>
>     public void processElement(..) {
>             mapState.put("lastUpdated", ctx.timestamp().toString());
>             ctx.timerService().registerEventTimeTimer(ctx.timestamp() +
> stateRetentionMillis);
>     }
>
>     public void onTimer(long timestamp, OnTimerContext ctx, ..) {
>         long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
>         if (timestamp >= lastUpdated + stateRetentionMillis) {
>             mapState.clear();
>         }
>     }
>
> Normally this "just works". As you can see, it shouldn't be possible that
> "lastUpdated" doesn't exist in state if timer was registered and onTimer
> gets called.
>
> However, after restoring state from a checkpoint, the job kept failing
> with this error:
>
> Caused by: java.lang.NumberFormatException: null
> at java.lang.Long.parseLong(Long.java:552)
> at java.lang.Long.parseLong(Long.java:631)
> at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.
> java:136)
> ..
>
> So apparently onTimer was called but lastUpdated wasn't found in the
> MapState.
>
> The background for restoring state in this case is not entirely clean.
> There was an OS level issue "Too many open files" after running a job for
> ~11 days. To fix that, we replaced the cluster with a new one and launched
> the Flink job again. State was successfully restored from the latest
> checkpoint that had been created by the "problematic execution". Now, I'm
> assuming that if the state wouldn't have been created successfully,
> restoring wouldn't succeed either – correct? This is just to rule out that
> the issue with state didn't happen because the checkpoint files were
> somehow corrupted due to the Too many open files problem.
>
> Thank you all for your continued support!
>
> P.S. I would be very much interested to hear if there's some cleaner way
> to achieve this kind of TTL for keyed state in Flink.
>

Re: Missing MapState when Timer fires after restored state

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

I think that is very good to know and fix. It feels a bit like a not so nice API design in RocksDB that iterators are required to check on two methods and the documentation of this is also newer than most of our RocksDB code, so an update there clearly makes sense.

@Sihua: if you want to fix this problem, can you also search for other usages of this `isValid` flag that should be covered. I will review a PR as soon as I can.

Best,
Stefan 

> Am 16.05.2018 um 08:40 schrieb sihua zhou <su...@163.com>:
> 
> Hi,
> I have a bref loop of the code that related to the restoring of incremental checkpoint, not abvious bug could be found. But there is a suspicious loophole that may lead to data loss, the suspicious code is pasted below.
> 
> 
> while (iterator.isValid()) {
> 
>    int keyGroup = 0;
>    for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
>       keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
>    }
> 
>    if (stateBackend.keyGroupRange.contains(keyGroup)) {
>       stateBackend.db.put(targetColumnFamilyHandle,
>          iterator.key(), iterator.value());
>    }
> 
>    iterator.next();
> }
> 
> we only use the iterator.isValid() to check whether we have reached the end of the iterator, but if we refer to RocksDB's wiki https://github.com/facebook/rocksdb/wiki/Iterator#error-handling <https://github.com/facebook/rocksdb/wiki/Iterator#error-handling> we can find that iterator.isValid() is indeed not enough, it may return false may also because the there is a internal error of RocksDB. So, a safer way is to called iterator.status() to check whether everthing is ok. I'm a bit want to fire a PR for this now, because in RocksDBMapState (we guarantee to support) we also use the iterator without checking the  iterator.status().
> @Stefan What do you think?  
> 
> Best, Sihua
> On 05/16/2018 10:22,sihua zhou<su...@163.com> <ma...@163.com> wrote: 
> Hi Juho,
> if I'm not misunderstand, you saied your're rescaling the job from the checkpoint? If yes, I think that behavior is not guaranteed yet, you can find this on the doc https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/checkpoints.html#difference-to-savepoints <https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/checkpoints.html#difference-to-savepoints>. So, I not sure whether this is a "bug" at current stage(personally I'd like to dig it out because currently we also use the checkpoint like the way you are) ...
> 
> Best, Sihua
> 
> On 05/16/2018 01:46,Juho Autio<ju...@rovio.com> <ma...@rovio.com> wrote: 
> I was able to reproduce this error.
> 
> I just happened to notice an important detail about the original failure:
> - checkpoint was created with a 1-node cluster (parallelism=8)
> - restored on a 2-node cluster (parallelism=16), caused that null exception
> 
> I tried restoring again from the problematic checkpoint again
> - restored on a 1-node cluster, no problems
> - restored on a 2-node cluster, getting the original error!
> 
> So now I have a way to reproduce the bug. To me it seems like the checkpoint itself is fine. The bug seems to be in redistributing the state of a restored checkpoint to a higher parallelism. I only tested each cluster size once (as described above) so it could also be coincidence, but seems at least likely now that it's about the state redistribution.
> 
> I'll try to follow up with those TRACE-level logs tomorrow. Today I tried adding these to the logback.xml, but I didn't get anything else but INFO level logs:
> 
>     <logger name="org.apache.flink.contrib.streaming.state" level="TRACE">
>         <appender-ref ref="file"/>
>     </logger>
>     <logger name="org.apache.flink.runtime.state" level="TRACE">
>         <appender-ref ref="file"/>
>     </logger>
>     <logger name="org.apache.flink.runtime.checkpoint" level="TRACE">
>         <appender-ref ref="file"/>
>     </logger>
>     <logger name="org.apache.flink.streaming.api.operators" level="TRACE">
>         <appender-ref ref="file"/>
>     </logger>
>     <logger name="org.apache.flink.streaming.runtime.tasks" level="TRACE">
>         <appender-ref ref="file"/>
>     </logger>
> 
> Maybe I need to edit the log4j.properties instead(?). Indeed it's Flink 1.5-SNAPSHOT and the package has all of these in the conf/ dir:
> 
> log4j-cli.properties
> log4j-console.properties
> log4j.properties
> log4j-yarn-session.properties
> logback-console.xml
> logback.xml
> logback-yarn.xml
> 
> On Tue, May 15, 2018 at 11:49 AM, Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>> wrote:
> Hi,
> 
>> Am 15.05.2018 um 10:34 schrieb Juho Autio <juho.autio@rovio.com <ma...@rovio.com>>:
>> 
>> Ok, that should be possible to provide. Are there any specific packages to set on trace level? Maybe just go with org.apache.flink.* on TRACE?
> 
> The following packages would be helpful:
> 
> org.apache.flink.contrib.streaming.state.*
> org.apache.flink.runtime.state.*
> org.apache.flink.runtime.checkpoint.*
> org.apache.flink.streaming.api.operators.*
> org.apache.flink.streaming.runtime.tasks.*
> 
>> 
>> > did the „too many open files“ problem only happen with local recovery (asking since it should actually not add the the amount of open files)
>> 
>> I think it happened in various places, maybe not when restoring.. Any way if the situation is like that, the system is pretty much unusable (on OS level), so it shouldn't matter too much which operation of the application it causes to fail? Any way I'll try to grab & share all log lines that say "Too Many Open Files"..
>> 
>> > and did you deactivate it on the second cluster for the restart or changed your OS settings?
>> 
>> No, didn't change anything except for increasing the ulimit on OS to prevent this from happening again. Note that the system only ran out of files after ~11 days of uptime. During that time there had been some local recoveries. This makes me wonder though, could it be that many local recoveries eventually caused this – could it be that in the occasion of local recovery some "old" files are left open, making the system eventually run out of files?
> 
> 
> From the way how local recovery works with incremental RocksDB checkpoints, I would not assume that it is the cause of the problem. In this particular case, the number of opened files on a local FS should not be higher than the number without local recovery. Maybe it is just a matter of the OS limit and the number of operators with a RocksDB backend running on the machine and the amount of files managed by all those RocksDB instances that simply exceed the limit. If you have an overview how many parallel operator instances with keyed state were running on the machine and assume some reasonable number of files per RocksDB instance and the limit configured in your OS, could that be the case?
> 
>> 
>> Thanks!
> 
> Thanks for your help!
> 
>> 
>> On Tue, May 15, 2018 at 11:17 AM, Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>> wrote:
>> Btw having a trace level log of a restart from a problematic checkpoint could actually be helpful if we cannot find the problem from the previous points. This can give a more detailed view of what checkpoint files are mapped to which operator.
>> 
>> I am having one more question: did the „too many open files“ problem only happen with local recovery (asking since it should actually not add the the amount of open files), and did you deactivate it on the second cluster for the restart or changed your OS settings?
>> 
>> 
>>> Am 15.05.2018 um 10:09 schrieb Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>>:
>>> 
>>> What I would like to see from the logs is (also depending a bit on your log level):
>>> 
>>> - all exceptions.
>>> - in which context exactly the „too many open files“ problem occurred, because I think for checkpoint consistency it should not matter as a checkpoint with such a problem should never succeed.
>>> - files that are written for checkpoints/savepoints.
>>> - completed checkpoints/savepoints ids.
>>> - the restored checkpoint/savepoint id.
>>> - files that are loaded on restore.
>>> 
>>>> Am 15.05.2018 um 10:02 schrieb Juho Autio <juho.autio@rovio.com <ma...@rovio.com>>:
>>>> 
>>>> Thanks all. I'll have to see about sharing the logs & configuration..
>>>> 
>>>> Is there something special that you'd like to see from the logs? It may be easier for me to get specific lines and obfuscate sensitive information instead of trying to do that for the full logs.
>>>> 
>>>> We basically have: RocksDBStateBackend with enableIncrementalCheckpointing=true, external state path on s3.
>>>> 
>>>> The code that we use is:
>>>> 
>>>>             env.setStateBackend(getStateBackend(statePath, new RocksDBStateBackend(statePath, true)));
>>>>             env.getCheckpointConfig().setMinPauseBetweenCheckpoints(params.getLong("checkpoint.minPause", 60 * 1000));
>>>>             env.getCheckpointConfig().setMaxConcurrentCheckpoints(params.getInt("checkpoint.maxConcurrent", 1));
>>>>             env.getCheckpointConfig().setCheckpointTimeout(params.getLong("checkpoint.timeout", 10 * 60 * 1000));
>>>>             env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>>> 
>>>> The problematic state that we tried to use was a checkpoint created with this conf.
>>>> 
>>>> > Are you using the local recovery feature?
>>>> 
>>>> Yes, and in this particular case the job was constantly failing/restarting because of Too Many Open Files. So we terminated the cluster entirely, created a new one, and launched a new job by specifying the latest checkpoint path to restore state from.
>>>> 
>>>> This is the only time I have seen this error happen with timer state. I still have that bad checkpoint data on s3, so I might be able to try to restore it again if needed to debug it. But that would require some tweaking, because I don't want to tangle with the same kafka consumer group offsets or send old data again to production endpoint.
>>>> 
>>>> Please keep in mind that there was that Too Many Open Files issue on the cluster that created the problematic checkpoint, if you think that's relevant.
>>>> 
>>>> On Tue, May 15, 2018 at 10:39 AM, Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>> wrote:
>>>> Hi,
>>>> 
>>>> I agree, this looks like a bug. Can you tell us your exact configuration of the state backend, e.g. if you are using incremental checkpoints or not. Are you using the local recovery feature? Are you restarting the job from a checkpoint or a savepoint? Can you provide logs for both the job that failed and the restarted job?
>>>> 
>>>> Best,
>>>> Stefan
>>>> 
>>>> 
>>>>> Am 14.05.2018 um 13:00 schrieb Juho Autio <juho.autio@rovio.com <ma...@rovio.com>>:
>>>>> 
>>>>> We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old state. After restoring state from a checkpoint, it seems like a timer had been restored, but not the data that was expected to be in a related MapState if such timer has been added.
>>>>> 
>>>>> The way I see this is that there's a bug, either of these:
>>>>> - The writing of timers & map states to Flink state is not synchronized (or maybe there are no such guarantees by design?)
>>>>> - Flink may restore a checkpoint that is actually corrupted/incomplete
>>>>> 
>>>>> Our code (simplified):
>>>>> 
>>>>>     private MapState<String, String> mapState;
>>>>> 
>>>>>     public void processElement(..) {
>>>>>             mapState.put("lastUpdated", ctx.timestamp().toString());
>>>>>             ctx.timerService().registerEventTimeTimer(ctx.timestamp() + stateRetentionMillis);
>>>>>     }
>>>>> 
>>>>>     public void onTimer(long timestamp, OnTimerContext ctx, ..) {
>>>>>         long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
>>>>>         if (timestamp >= lastUpdated + stateRetentionMillis) {
>>>>>             mapState.clear();
>>>>>         }
>>>>>     }
>>>>> 
>>>>> Normally this "just works". As you can see, it shouldn't be possible that "lastUpdated" doesn't exist in state if timer was registered and onTimer gets called.
>>>>> 
>>>>> However, after restoring state from a checkpoint, the job kept failing with this error:
>>>>> 
>>>>> Caused by: java.lang.NumberFormatException: null
>>>>> at java.lang.Long.parseLong(Long.java:552)
>>>>> at java.lang.Long.parseLong(Long.java:631)
>>>>> at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
>>>>> ..
>>>>> 
>>>>> So apparently onTimer was called but lastUpdated wasn't found in the MapState.
>>>>> 
>>>>> The background for restoring state in this case is not entirely clean. There was an OS level issue "Too many open files" after running a job for ~11 days. To fix that, we replaced the cluster with a new one and launched the Flink job again. State was successfully restored from the latest checkpoint that had been created by the "problematic execution". Now, I'm assuming that if the state wouldn't have been created successfully, restoring wouldn't succeed either – correct? This is just to rule out that the issue with state didn't happen because the checkpoint files were somehow corrupted due to the Too many open files problem.
>>>>> 
>>>>> Thank you all for your continued support!
>>>>> 
>>>>> P.S. I would be very much interested to hear if there's some cleaner way to achieve this kind of TTL for keyed state in Flink.
>>>> 
>>>> 
>>>> 
>>> 
>> 
>> 
>> 
> 
> 
> 
> 


Re: Missing MapState when Timer fires after restored state

Posted by sihua zhou <su...@163.com>.
Hi,
I have a bref loop of the code that related to the restoring of incremental checkpoint, not abvious bug could be found. But there is a suspicious loophole that may lead to data loss, the suspicious code is pasted below.



while (iterator.isValid()) {

int keyGroup = 0;
   for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
      keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
}

if (stateBackend.keyGroupRange.contains(keyGroup)) {
stateBackend.db.put(targetColumnFamilyHandle,
iterator.key(), iterator.value());
}

   iterator.next();
}


we only use the iterator.isValid() to check whether we have reached the end of the iterator, but if we refer to RocksDB's wiki https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we can find that iterator.isValid() is indeed not enough, it may return false may also because the there is a internal error of RocksDB. So, a safer way is to called iterator.status() to check whether everthing is ok. I'm a bit want to fire a PR for this now, because in RocksDBMapState (we guarantee to support) we also use the iterator without checking the  iterator.status().
@Stefan What do you think?  


Best, Sihua
On 05/16/2018 10:22,sihua zhou<su...@163.com> wrote:
Hi Juho,
if I'm not misunderstand, you saied your're rescaling the job from the checkpoint? If yes, I think that behavior is not guaranteed yet, you can find this on the doc https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/checkpoints.html#difference-to-savepoints. So, I not sure whether this is a "bug" at current stage(personally I'd like to dig it out because currently we also use the checkpoint like the way you are) ...


Best, Sihua


On 05/16/2018 01:46,Juho Autio<ju...@rovio.com> wrote:
I was able to reproduce this error.


I just happened to notice an important detail about the original failure:

- checkpoint was created with a 1-node cluster (parallelism=8)
- restored on a 2-node cluster (parallelism=16), caused that null exception



I tried restoring again from the problematic checkpoint again

- restored on a 1-node cluster, no problems
- restored on a 2-node cluster, getting the original error!


So now I have a way to reproduce the bug. To me it seems like the checkpoint itself is fine. The bug seems to be in redistributing the state of a restored checkpoint to a higher parallelism. I only tested each cluster size once (as described above) so it could also be coincidence, but seems at least likely now that it's about the state redistribution.


I'll try to follow up with those TRACE-level logs tomorrow. Today I tried adding these to the logback.xml, but I didn't get anything else but INFO level logs:



    <logger name="org.apache.flink.contrib.streaming.state" level="TRACE">
        <appender-ref ref="file"/>
    </logger>
    <logger name="org.apache.flink.runtime.state" level="TRACE">
        <appender-ref ref="file"/>
    </logger>
    <logger name="org.apache.flink.runtime.checkpoint" level="TRACE">
        <appender-ref ref="file"/>
    </logger>
    <logger name="org.apache.flink.streaming.api.operators" level="TRACE">
        <appender-ref ref="file"/>
    </logger>
    <logger name="org.apache.flink.streaming.runtime.tasks" level="TRACE">
        <appender-ref ref="file"/>
    </logger>


Maybe I need to edit the log4j.properties instead(?). Indeed it's Flink 1.5-SNAPSHOT and the package has all of these in the conf/ dir:


log4j-cli.properties
log4j-console.properties
log4j.properties
log4j-yarn-session.properties
logback-console.xml
logback.xml
logback-yarn.xml


On Tue, May 15, 2018 at 11:49 AM, Stefan Richter <s....@data-artisans.com> wrote:

Hi,



Am 15.05.2018 um 10:34 schrieb Juho Autio <ju...@rovio.com>:


Ok, that should be possible to provide. Are there any specific packages to set on trace level? Maybe just go with org.apache.flink.* on TRACE?


The following packages would be helpful:


org.apache.flink.contrib.streaming.state.*
org.apache.flink.runtime.state.*
org.apache.flink.runtime.checkpoint.*
org.apache.flink.streaming.api.operators.*
org.apache.flink.streaming.runtime.tasks.*




> did the „too many open files“ problem only happen with local recovery (asking since it should actually not add the the amount of open files)


I think it happened in various places, maybe not when restoring.. Any way if the situation is like that, the system is pretty much unusable (on OS level), so it shouldn't matter too much which operation of the application it causes to fail? Any way I'll try to grab & share all log lines that say "Too Many Open Files"..


> and did you deactivate it on the second cluster for the restart or changed your OS settings?



No, didn't change anything except for increasing the ulimit on OS to prevent this from happening again. Note that the system only ran out of files after ~11 days of uptime. During that time there had been some local recoveries. This makes me wonder though, could it be that many local recoveries eventually caused this – could it be that in the occasion of local recovery some "old" files are left open, making the system eventually run out of files?




From the way how local recovery works with incremental RocksDB checkpoints, I would not assume that it is the cause of the problem. In this particular case, the number of opened files on a local FS should not be higher than the number without local recovery. Maybe it is just a matter of the OS limit and the number of operators with a RocksDB backend running on the machine and the amount of files managed by all those RocksDB instances that simply exceed the limit. If you have an overview how many parallel operator instances with keyed state were running on the machine and assume some reasonable number of files per RocksDB instance and the limit configured in your OS, could that be the case?




Thanks!


Thanks for your help!




On Tue, May 15, 2018 at 11:17 AM, Stefan Richter <s....@data-artisans.com> wrote:

Btw having a trace level log of a restart from a problematic checkpoint could actually be helpful if we cannot find the problem from the previous points. This can give a more detailed view of what checkpoint files are mapped to which operator.


I am having one more question: did the „too many open files“ problem only happen with local recovery (asking since it should actually not add the the amount of open files), and did you deactivate it on the second cluster for the restart or changed your OS settings?




Am 15.05.2018 um 10:09 schrieb Stefan Richter <s....@data-artisans.com>:


What I would like to see from the logs is (also depending a bit on your log level):


- all exceptions.
- in which context exactly the „too many open files“ problem occurred, because I think for checkpoint consistency it should not matter as a checkpoint with such a problem should never succeed.
- files that are written for checkpoints/savepoints.
- completed checkpoints/savepoints ids.
- the restored checkpoint/savepoint id.
- files that are loaded on restore.


Am 15.05.2018 um 10:02 schrieb Juho Autio <ju...@rovio.com>:


Thanks all. I'll have to see about sharing the logs & configuration..


Is there something special that you'd like to see from the logs? It may be easier for me to get specific lines and obfuscate sensitive information instead of trying to do that for the full logs.



We basically have: RocksDBStateBackend with enableIncrementalCheckpointing=true, external state path on s3.



The code that we use is:


            env.setStateBackend(getStateBackend(statePath, new RocksDBStateBackend(statePath, true)));
            env.getCheckpointConfig().setMinPauseBetweenCheckpoints(params.getLong("checkpoint.minPause", 60 * 1000));
            env.getCheckpointConfig().setMaxConcurrentCheckpoints(params.getInt("checkpoint.maxConcurrent", 1));
            env.getCheckpointConfig().setCheckpointTimeout(params.getLong("checkpoint.timeout", 10 * 60 * 1000));
            env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


The problematic state that we tried to use was a checkpoint created with this conf.



> Are you using the local recovery feature?


Yes, and in this particular case the job was constantly failing/restarting because of Too Many Open Files. So we terminated the cluster entirely, created a new one, and launched a new job by specifying the latest checkpoint path to restore state from.


This is the only time I have seen this error happen with timer state. I still have that bad checkpoint data on s3, so I might be able to try to restore it again if needed to debug it. But that would require some tweaking, because I don't want to tangle with the same kafka consumer group offsets or send old data again to production endpoint.


Please keep in mind that there was that Too Many Open Files issue on the cluster that created the problematic checkpoint, if you think that's relevant.


On Tue, May 15, 2018 at 10:39 AM, Stefan Richter <s....@data-artisans.com> wrote:

Hi,


I agree, this looks like a bug. Can you tell us your exact configuration of the state backend, e.g. if you are using incremental checkpoints or not. Are you using the local recovery feature? Are you restarting the job from a checkpoint or a savepoint? Can you provide logs for both the job that failed and the restarted job?


Best,
Stefan




Am 14.05.2018 um 13:00 schrieb Juho Autio <ju...@rovio.com>:


We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old state. After restoring state from a checkpoint, it seems like a timer had been restored, but not the data that was expected to be in a related MapState if such timer has been added.



The way I see this is that there's a bug, either of these:
- The writing of timers & map states to Flink state is not synchronized (or maybe there are no such guarantees by design?)
- Flink may restore a checkpoint that is actually corrupted/incomplete


Our code (simplified):


    private MapState<String, String> mapState;


    public void processElement(..) {
            mapState.put("lastUpdated", ctx.timestamp().toString());
            ctx.timerService().registerEventTimeTimer(ctx.timestamp() + stateRetentionMillis);
    }


    public void onTimer(long timestamp, OnTimerContext ctx, ..) {

        long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
        if (timestamp >= lastUpdated + stateRetentionMillis) {
            mapState.clear();
        }
    }


Normally this "just works". As you can see, it shouldn't be possible that "lastUpdated" doesn't exist in state if timer was registered and onTimer gets called.


However, after restoring state from a checkpoint, the job kept failing with this error:


Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
..


So apparently onTimer was called but lastUpdated wasn't found in the MapState.


The background for restoring state in this case is not entirely clean. There was an OS level issue "Too many open files" after running a job for ~11 days. To fix that, we replaced the cluster with a new one and launched the Flink job again. State was successfully restored from the latest checkpoint that had been created by the "problematic execution". Now, I'm assuming that if the state wouldn't have been created successfully, restoring wouldn't succeed either – correct? This is just to rule out that the issue with state didn't happen because the checkpoint files were somehow corrupted due to the Too many open files problem.



Thank you all for your continued support!


P.S. I would be very much interested to hear if there's some cleaner way to achieve this kind of TTL for keyed state in Flink.




















Re: Missing MapState when Timer fires after restored state

Posted by sihua zhou <su...@163.com>.
Hi Juho,
I tried multi times follow the simple code you privoded, but still can't reproduce the bug you met. There's one more question I'd like to confirm with you, is the stateRetentionMillis a fixed(final) field or it might be changed on some condition?


Best, Sihua
On 05/19/2018 08:19,sihua zhou<su...@163.com> wrote:
Sorry for the incorrect information, that's not the case.


Best, Sihua






On 05/19/2018 07:58,sihua zhou<su...@163.com> wrote:
Hi Juho & Stefan,
just a quick summarize, good news is that I think I found the bug, bad news is that we are currently at the RC4...
The bug is here.


try (RocksIteratorWrapper iterator = getRocksIterator(restoreDb, columnFamilyHandle)) {

int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
   byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
   for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
      startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
}

   iterator.seek(startKeyGroupPrefixBytes);

   while (iterator.isValid()) {

int keyGroup = 0;
      for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
         keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
}

if (stateBackend.keyGroupRange.contains(keyGroup)) {
stateBackend.db.put(targetColumnFamilyHandle,
iterator.key(), iterator.value());
}

      iterator.next();
}
}


for every state handle to get the target data, we _seek(state.keyGroupRange.getStartKeyGroup())_, so the _iterator_ could be INVALID imediately if the state handle's start key group is bigger that _state.keyGroupRange.getStartKeyGroup()_. Then, data lost....


@Stefan, this still need your double check, plz correct me if I'm wrong.


Best, Sihua


On 05/18/2018 17:29,sihua zhou<su...@163.com> wrote:
Hi Juho,
thanks for trying this out. I'm running out of myself now... Let's do bref summarize.


- have we reached a consensus that the map state losts when restoring?(because you can restore successfully without rescaling)
- the timer state is correctly restore, because for timer, when restoring from checkpoint it has no different with storing from savepoint.(@Stefan plz correct me if I'm wrong)


And @Juho, have you try to rescale the job with a different parallelism(not always with 16)?


Best, Sihua








On 05/18/2018 17:14,Juho Autio<ju...@rovio.com> wrote:
Tested with http://people.apache.org/~trohrmann/flink-1.5.0-rc4/flink-1.5.0-bin-hadoop27-scala_2.11.tgz.


It hits the same problem.


Btw, why is this error logged on INFO level?



2018-05-18 09:03:52,595 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - PlatformIDsProcessFunction -> AppIdFilter([MyApp]) -> DiscardBeforeDateFunction(null) -> DiscardedEventsFunction -> (LateDataLabelFunction, FooEventMapper -> ThreadPoolFooGateway (capacity=500) -> Sink: ResponseKafkaSink) (8/16) (c2c102bcf4e1c7f523a2d00b81ed1a23) switched from RUNNING to FAILED.
java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more
2018-05-18 09:03:52,596 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job FooHttpStream (a54a9210341fd26d210122986f2be844) switched from state RUNNING to FAILING.
java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more


On Fri, May 18, 2018 at 11:06 AM, Juho Autio <ju...@rovio.com> wrote:

Thanks Sihua, I'll give that RC a try.


On Fri, May 18, 2018 at 10:58 AM, sihua zhou <su...@163.com> wrote:

Hi Juho,
would you like to try out the latest RC(http://people.apache.org/~trohrmann/flink-1.5.0-rc4/) to rescaling the job from the "problematic" checkpoint? The latest RC includes a fix for the potential silently data lost. If it's the reason, you will see a different exception when you trying to recover you job.


Best, Sihua






On 05/18/2018 15:02,Juho Autio<ju...@rovio.com> wrote:
I see. I appreciate keeping this option available even if it's "beta". The current situation could be documented better, though.


As long as rescaling from checkpoint is not officially supported, I would put it behind a flag similar to --allowNonRestoredState. The flag could be called --allowRescalingRestoredCheckpointState, for example. This would make sure that users are aware that what they're using is experimental and might have unexpected effects.


As for the bug I faced, indeed I was able to reproduce it consistently. And I have provided TRACE-level logs personally to Stefan. If there is no Jira ticket for this yet, would you like me to create one?


On Thu, May 17, 2018 at 1:00 PM, Stefan Richter <s....@data-artisans.com> wrote:
Hi,

>
> This raises a couple of questions:
> - Is it a bug though, that the state restoring goes wrong like it does for my job? Based on my experience it seems like rescaling sometimes works, but then you can have these random errors.

If there is a problem, I would still consider it a bug because it should work correctly.

> - If it's not supported properly, why not refuse to restore a checkpoint if it would require rescaling?

It should work properly, but I would preferred to keep this at the level of a "hidden feature“ until it got some more exposure and also some questions about the future of differences between savepoints and checkpoints are solved.

> - We have sometimes had Flink jobs where the state has become so heavy that cancelling with a savepoint times out & fails. Incremental checkpoints are still working because they don't timeout as long as the state is growing linearly. In that case if we want to scale up (for example to enable successful savepoint creation ;) ), the only thing we can do is to restore from the latest checkpoint. But then we have no way to scale up by increasing the cluster size, because we can't create a savepoint with a smaller cluster but on the other hand can't restore a checkpoint to a bigger cluster, if rescaling from a checkpoint is not supposed to be relied on. So in this case we're stuck and forced to start from an empty state?

IMO there is a very good chance that this will simply become a normal feature in the near future.

Best,
Stefan








Re: Missing MapState when Timer fires after restored state

Posted by sihua zhou <su...@163.com>.
Sorry for the incorrect information, that's not the case.


Best, Sihua






On 05/19/2018 07:58,sihua zhou<su...@163.com> wrote:
Hi Juho & Stefan,
just a quick summarize, good news is that I think I found the bug, bad news is that we are currently at the RC4...
The bug is here.


try (RocksIteratorWrapper iterator = getRocksIterator(restoreDb, columnFamilyHandle)) {

int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
   byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
   for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
      startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
}

   iterator.seek(startKeyGroupPrefixBytes);

   while (iterator.isValid()) {

int keyGroup = 0;
      for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
         keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
}

if (stateBackend.keyGroupRange.contains(keyGroup)) {
stateBackend.db.put(targetColumnFamilyHandle,
iterator.key(), iterator.value());
}

      iterator.next();
}
}


for every state handle to get the target data, we _seek(state.keyGroupRange.getStartKeyGroup())_, so the _iterator_ could be INVALID imediately if the state handle's start key group is bigger that _state.keyGroupRange.getStartKeyGroup()_. Then, data lost....


@Stefan, this still need your double check, plz correct me if I'm wrong.


Best, Sihua


On 05/18/2018 17:29,sihua zhou<su...@163.com> wrote:
Hi Juho,
thanks for trying this out. I'm running out of myself now... Let's do bref summarize.


- have we reached a consensus that the map state losts when restoring?(because you can restore successfully without rescaling)
- the timer state is correctly restore, because for timer, when restoring from checkpoint it has no different with storing from savepoint.(@Stefan plz correct me if I'm wrong)


And @Juho, have you try to rescale the job with a different parallelism(not always with 16)?


Best, Sihua








On 05/18/2018 17:14,Juho Autio<ju...@rovio.com> wrote:
Tested with http://people.apache.org/~trohrmann/flink-1.5.0-rc4/flink-1.5.0-bin-hadoop27-scala_2.11.tgz.


It hits the same problem.


Btw, why is this error logged on INFO level?



2018-05-18 09:03:52,595 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - PlatformIDsProcessFunction -> AppIdFilter([MyApp]) -> DiscardBeforeDateFunction(null) -> DiscardedEventsFunction -> (LateDataLabelFunction, FooEventMapper -> ThreadPoolFooGateway (capacity=500) -> Sink: ResponseKafkaSink) (8/16) (c2c102bcf4e1c7f523a2d00b81ed1a23) switched from RUNNING to FAILED.
java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more
2018-05-18 09:03:52,596 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job FooHttpStream (a54a9210341fd26d210122986f2be844) switched from state RUNNING to FAILING.
java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more


On Fri, May 18, 2018 at 11:06 AM, Juho Autio <ju...@rovio.com> wrote:

Thanks Sihua, I'll give that RC a try.


On Fri, May 18, 2018 at 10:58 AM, sihua zhou <su...@163.com> wrote:

Hi Juho,
would you like to try out the latest RC(http://people.apache.org/~trohrmann/flink-1.5.0-rc4/) to rescaling the job from the "problematic" checkpoint? The latest RC includes a fix for the potential silently data lost. If it's the reason, you will see a different exception when you trying to recover you job.


Best, Sihua






On 05/18/2018 15:02,Juho Autio<ju...@rovio.com> wrote:
I see. I appreciate keeping this option available even if it's "beta". The current situation could be documented better, though.


As long as rescaling from checkpoint is not officially supported, I would put it behind a flag similar to --allowNonRestoredState. The flag could be called --allowRescalingRestoredCheckpointState, for example. This would make sure that users are aware that what they're using is experimental and might have unexpected effects.


As for the bug I faced, indeed I was able to reproduce it consistently. And I have provided TRACE-level logs personally to Stefan. If there is no Jira ticket for this yet, would you like me to create one?


On Thu, May 17, 2018 at 1:00 PM, Stefan Richter <s....@data-artisans.com> wrote:
Hi,

>
> This raises a couple of questions:
> - Is it a bug though, that the state restoring goes wrong like it does for my job? Based on my experience it seems like rescaling sometimes works, but then you can have these random errors.

If there is a problem, I would still consider it a bug because it should work correctly.

> - If it's not supported properly, why not refuse to restore a checkpoint if it would require rescaling?

It should work properly, but I would preferred to keep this at the level of a "hidden feature“ until it got some more exposure and also some questions about the future of differences between savepoints and checkpoints are solved.

> - We have sometimes had Flink jobs where the state has become so heavy that cancelling with a savepoint times out & fails. Incremental checkpoints are still working because they don't timeout as long as the state is growing linearly. In that case if we want to scale up (for example to enable successful savepoint creation ;) ), the only thing we can do is to restore from the latest checkpoint. But then we have no way to scale up by increasing the cluster size, because we can't create a savepoint with a smaller cluster but on the other hand can't restore a checkpoint to a bigger cluster, if rescaling from a checkpoint is not supposed to be relied on. So in this case we're stuck and forced to start from an empty state?

IMO there is a very good chance that this will simply become a normal feature in the near future.

Best,
Stefan








Re: Missing MapState when Timer fires after restored state

Posted by sihua zhou <su...@163.com>.
Hi Juho & Stefan,
just a quick summarize, good news is that I think I found the bug, bad news is that we are currently at the RC4...
The bug is here.


try (RocksIteratorWrapper iterator = getRocksIterator(restoreDb, columnFamilyHandle)) {

int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
   byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
   for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
      startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
}

   iterator.seek(startKeyGroupPrefixBytes);

   while (iterator.isValid()) {

int keyGroup = 0;
      for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
         keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
}

if (stateBackend.keyGroupRange.contains(keyGroup)) {
stateBackend.db.put(targetColumnFamilyHandle,
iterator.key(), iterator.value());
}

      iterator.next();
}
}


for every state handle to get the target data, we _seek(state.keyGroupRange.getStartKeyGroup())_, so the _iterator_ could be INVALID imediately if the state handle's start key group is bigger that _state.keyGroupRange.getStartKeyGroup()_. Then, data lost....


@Stefan, this still need your double check, plz correct me if I'm wrong.


Best, Sihua


On 05/18/2018 17:29,sihua zhou<su...@163.com> wrote:
Hi Juho,
thanks for trying this out. I'm running out of myself now... Let's do bref summarize.


- have we reached a consensus that the map state losts when restoring?(because you can restore successfully without rescaling)
- the timer state is correctly restore, because for timer, when restoring from checkpoint it has no different with storing from savepoint.(@Stefan plz correct me if I'm wrong)


And @Juho, have you try to rescale the job with a different parallelism(not always with 16)?


Best, Sihua








On 05/18/2018 17:14,Juho Autio<ju...@rovio.com> wrote:
Tested with http://people.apache.org/~trohrmann/flink-1.5.0-rc4/flink-1.5.0-bin-hadoop27-scala_2.11.tgz.


It hits the same problem.


Btw, why is this error logged on INFO level?



2018-05-18 09:03:52,595 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - PlatformIDsProcessFunction -> AppIdFilter([MyApp]) -> DiscardBeforeDateFunction(null) -> DiscardedEventsFunction -> (LateDataLabelFunction, FooEventMapper -> ThreadPoolFooGateway (capacity=500) -> Sink: ResponseKafkaSink) (8/16) (c2c102bcf4e1c7f523a2d00b81ed1a23) switched from RUNNING to FAILED.
java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more
2018-05-18 09:03:52,596 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job FooHttpStream (a54a9210341fd26d210122986f2be844) switched from state RUNNING to FAILING.
java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more


On Fri, May 18, 2018 at 11:06 AM, Juho Autio <ju...@rovio.com> wrote:

Thanks Sihua, I'll give that RC a try.


On Fri, May 18, 2018 at 10:58 AM, sihua zhou <su...@163.com> wrote:

Hi Juho,
would you like to try out the latest RC(http://people.apache.org/~trohrmann/flink-1.5.0-rc4/) to rescaling the job from the "problematic" checkpoint? The latest RC includes a fix for the potential silently data lost. If it's the reason, you will see a different exception when you trying to recover you job.


Best, Sihua






On 05/18/2018 15:02,Juho Autio<ju...@rovio.com> wrote:
I see. I appreciate keeping this option available even if it's "beta". The current situation could be documented better, though.


As long as rescaling from checkpoint is not officially supported, I would put it behind a flag similar to --allowNonRestoredState. The flag could be called --allowRescalingRestoredCheckpointState, for example. This would make sure that users are aware that what they're using is experimental and might have unexpected effects.


As for the bug I faced, indeed I was able to reproduce it consistently. And I have provided TRACE-level logs personally to Stefan. If there is no Jira ticket for this yet, would you like me to create one?


On Thu, May 17, 2018 at 1:00 PM, Stefan Richter <s....@data-artisans.com> wrote:
Hi,

>
> This raises a couple of questions:
> - Is it a bug though, that the state restoring goes wrong like it does for my job? Based on my experience it seems like rescaling sometimes works, but then you can have these random errors.

If there is a problem, I would still consider it a bug because it should work correctly.

> - If it's not supported properly, why not refuse to restore a checkpoint if it would require rescaling?

It should work properly, but I would preferred to keep this at the level of a "hidden feature“ until it got some more exposure and also some questions about the future of differences between savepoints and checkpoints are solved.

> - We have sometimes had Flink jobs where the state has become so heavy that cancelling with a savepoint times out & fails. Incremental checkpoints are still working because they don't timeout as long as the state is growing linearly. In that case if we want to scale up (for example to enable successful savepoint creation ;) ), the only thing we can do is to restore from the latest checkpoint. But then we have no way to scale up by increasing the cluster size, because we can't create a savepoint with a smaller cluster but on the other hand can't restore a checkpoint to a bigger cluster, if rescaling from a checkpoint is not supposed to be relied on. So in this case we're stuck and forced to start from an empty state?

IMO there is a very good chance that this will simply become a normal feature in the near future.

Best,
Stefan








Re: Missing MapState when Timer fires after restored state

Posted by sihua zhou <su...@163.com>.
Hi Juho,
thanks for trying this out. I'm running out of myself now... Let's do bref summarize.


- have we reached a consensus that the map state losts when restoring?(because you can restore successfully without rescaling)
- the timer state is correctly restore, because for timer, when restoring from checkpoint it has no different with storing from savepoint.(@Stefan plz correct me if I'm wrong)


And @Juho, have you try to rescale the job with a different parallelism(not always with 16)?


Best, Sihua








On 05/18/2018 17:14,Juho Autio<ju...@rovio.com> wrote:
Tested with http://people.apache.org/~trohrmann/flink-1.5.0-rc4/flink-1.5.0-bin-hadoop27-scala_2.11.tgz.


It hits the same problem.


Btw, why is this error logged on INFO level?



2018-05-18 09:03:52,595 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - PlatformIDsProcessFunction -> AppIdFilter([MyApp]) -> DiscardBeforeDateFunction(null) -> DiscardedEventsFunction -> (LateDataLabelFunction, FooEventMapper -> ThreadPoolFooGateway (capacity=500) -> Sink: ResponseKafkaSink) (8/16) (c2c102bcf4e1c7f523a2d00b81ed1a23) switched from RUNNING to FAILED.
java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more
2018-05-18 09:03:52,596 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job FooHttpStream (a54a9210341fd26d210122986f2be844) switched from state RUNNING to FAILING.
java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75)
at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more


On Fri, May 18, 2018 at 11:06 AM, Juho Autio <ju...@rovio.com> wrote:

Thanks Sihua, I'll give that RC a try.


On Fri, May 18, 2018 at 10:58 AM, sihua zhou <su...@163.com> wrote:

Hi Juho,
would you like to try out the latest RC(http://people.apache.org/~trohrmann/flink-1.5.0-rc4/) to rescaling the job from the "problematic" checkpoint? The latest RC includes a fix for the potential silently data lost. If it's the reason, you will see a different exception when you trying to recover you job.


Best, Sihua






On 05/18/2018 15:02,Juho Autio<ju...@rovio.com> wrote:
I see. I appreciate keeping this option available even if it's "beta". The current situation could be documented better, though.


As long as rescaling from checkpoint is not officially supported, I would put it behind a flag similar to --allowNonRestoredState. The flag could be called --allowRescalingRestoredCheckpointState, for example. This would make sure that users are aware that what they're using is experimental and might have unexpected effects.


As for the bug I faced, indeed I was able to reproduce it consistently. And I have provided TRACE-level logs personally to Stefan. If there is no Jira ticket for this yet, would you like me to create one?


On Thu, May 17, 2018 at 1:00 PM, Stefan Richter <s....@data-artisans.com> wrote:
Hi,

>
> This raises a couple of questions:
> - Is it a bug though, that the state restoring goes wrong like it does for my job? Based on my experience it seems like rescaling sometimes works, but then you can have these random errors.

If there is a problem, I would still consider it a bug because it should work correctly.

> - If it's not supported properly, why not refuse to restore a checkpoint if it would require rescaling?

It should work properly, but I would preferred to keep this at the level of a "hidden feature“ until it got some more exposure and also some questions about the future of differences between savepoints and checkpoints are solved.

> - We have sometimes had Flink jobs where the state has become so heavy that cancelling with a savepoint times out & fails. Incremental checkpoints are still working because they don't timeout as long as the state is growing linearly. In that case if we want to scale up (for example to enable successful savepoint creation ;) ), the only thing we can do is to restore from the latest checkpoint. But then we have no way to scale up by increasing the cluster size, because we can't create a savepoint with a smaller cluster but on the other hand can't restore a checkpoint to a bigger cluster, if rescaling from a checkpoint is not supposed to be relied on. So in this case we're stuck and forced to start from an empty state?

IMO there is a very good chance that this will simply become a normal feature in the near future.

Best,
Stefan








Re: Missing MapState when Timer fires after restored state

Posted by Juho Autio <ju...@rovio.com>.
Tested with
http://people.apache.org/~trohrmann/flink-1.5.0-rc4/flink-1.5.0-bin-hadoop27-scala_2.11.tgz
.

It hits the same problem.

Btw, why is this error logged on INFO level?

2018-05-18 09:03:52,595 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        -
PlatformIDsProcessFunction -> AppIdFilter([MyApp]) ->
DiscardBeforeDateFunction(null) -> DiscardedEventsFunction ->
(LateDataLabelFunction, FooEventMapper -> ThreadPoolFooGateway
(capacity=500) -> Sink: ResponseKafkaSink) (8/16)
(c2c102bcf4e1c7f523a2d00b81ed1a23) switched from RUNNING to FAILED.
java.lang.RuntimeException: Exception occurred while processing valve
output watermark:
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at
com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
at
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
at
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75)
at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more
2018-05-18 09:03:52,596 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
FooHttpStream (a54a9210341fd26d210122986f2be844) switched from state
RUNNING to FAILING.
java.lang.RuntimeException: Exception occurred while processing valve
output watermark:
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at
com.mycompany.ds.flink.http.Foo.EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
at
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)
at
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onEventTime(LegacyKeyedProcessOperator.java:75)
at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more

On Fri, May 18, 2018 at 11:06 AM, Juho Autio <ju...@rovio.com> wrote:

> Thanks Sihua, I'll give that RC a try.
>
> On Fri, May 18, 2018 at 10:58 AM, sihua zhou <su...@163.com> wrote:
>
>> Hi Juho,
>> would you like to try out the latest RC(http://people.apache.org/~t
>> rohrmann/flink-1.5.0-rc4/) to rescaling the job from the "problematic"
>> checkpoint? The latest RC includes a fix for the potential silently data
>> lost. If it's the reason, you will see a different exception when you
>> trying to recover you job.
>>
>> Best, Sihua
>>
>>
>>
>> On 05/18/2018 15:02,Juho Autio<ju...@rovio.com>
>> <ju...@rovio.com> wrote:
>>
>> I see. I appreciate keeping this option available even if it's "beta".
>> The current situation could be documented better, though.
>>
>> As long as rescaling from checkpoint is not officially supported, I would
>> put it behind a flag similar to --allowNonRestoredState. The flag could be
>> called --allowRescalingRestoredCheckpointState, for example. This would
>> make sure that users are aware that what they're using is experimental and
>> might have unexpected effects.
>>
>> As for the bug I faced, indeed I was able to reproduce it consistently.
>> And I have provided TRACE-level logs personally to Stefan. If there is no
>> Jira ticket for this yet, would you like me to create one?
>>
>> On Thu, May 17, 2018 at 1:00 PM, Stefan Richter <
>> s.richter@data-artisans.com> wrote:
>>
>>> Hi,
>>>
>>> >
>>> > This raises a couple of questions:
>>> > - Is it a bug though, that the state restoring goes wrong like it does
>>> for my job? Based on my experience it seems like rescaling sometimes works,
>>> but then you can have these random errors.
>>>
>>> If there is a problem, I would still consider it a bug because it should
>>> work correctly.
>>>
>>> > - If it's not supported properly, why not refuse to restore a
>>> checkpoint if it would require rescaling?
>>>
>>> It should work properly, but I would preferred to keep this at the level
>>> of a "hidden feature“ until it got some more exposure and also some
>>> questions about the future of differences between savepoints and
>>> checkpoints are solved.
>>>
>>> > - We have sometimes had Flink jobs where the state has become so heavy
>>> that cancelling with a savepoint times out & fails. Incremental checkpoints
>>> are still working because they don't timeout as long as the state is
>>> growing linearly. In that case if we want to scale up (for example to
>>> enable successful savepoint creation ;) ), the only thing we can do is to
>>> restore from the latest checkpoint. But then we have no way to scale up by
>>> increasing the cluster size, because we can't create a savepoint with a
>>> smaller cluster but on the other hand can't restore a checkpoint to a
>>> bigger cluster, if rescaling from a checkpoint is not supposed to be relied
>>> on. So in this case we're stuck and forced to start from an empty state?
>>>
>>> IMO there is a very good chance that this will simply become a normal
>>> feature in the near future.
>>>
>>> Best,
>>> Stefan
>>>
>>>
>>
>

Re: Missing MapState when Timer fires after restored state

Posted by Juho Autio <ju...@rovio.com>.
Thanks Sihua, I'll give that RC a try.

On Fri, May 18, 2018 at 10:58 AM, sihua zhou <su...@163.com> wrote:

> Hi Juho,
> would you like to try out the latest RC(http://people.apache.org/~
> trohrmann/flink-1.5.0-rc4/) to rescaling the job from the "problematic"
> checkpoint? The latest RC includes a fix for the potential silently data
> lost. If it's the reason, you will see a different exception when you
> trying to recover you job.
>
> Best, Sihua
>
>
>
> On 05/18/2018 15:02,Juho Autio<ju...@rovio.com>
> <ju...@rovio.com> wrote:
>
> I see. I appreciate keeping this option available even if it's "beta". The
> current situation could be documented better, though.
>
> As long as rescaling from checkpoint is not officially supported, I would
> put it behind a flag similar to --allowNonRestoredState. The flag could be
> called --allowRescalingRestoredCheckpointState, for example. This would
> make sure that users are aware that what they're using is experimental and
> might have unexpected effects.
>
> As for the bug I faced, indeed I was able to reproduce it consistently.
> And I have provided TRACE-level logs personally to Stefan. If there is no
> Jira ticket for this yet, would you like me to create one?
>
> On Thu, May 17, 2018 at 1:00 PM, Stefan Richter <
> s.richter@data-artisans.com> wrote:
>
>> Hi,
>>
>> >
>> > This raises a couple of questions:
>> > - Is it a bug though, that the state restoring goes wrong like it does
>> for my job? Based on my experience it seems like rescaling sometimes works,
>> but then you can have these random errors.
>>
>> If there is a problem, I would still consider it a bug because it should
>> work correctly.
>>
>> > - If it's not supported properly, why not refuse to restore a
>> checkpoint if it would require rescaling?
>>
>> It should work properly, but I would preferred to keep this at the level
>> of a "hidden feature“ until it got some more exposure and also some
>> questions about the future of differences between savepoints and
>> checkpoints are solved.
>>
>> > - We have sometimes had Flink jobs where the state has become so heavy
>> that cancelling with a savepoint times out & fails. Incremental checkpoints
>> are still working because they don't timeout as long as the state is
>> growing linearly. In that case if we want to scale up (for example to
>> enable successful savepoint creation ;) ), the only thing we can do is to
>> restore from the latest checkpoint. But then we have no way to scale up by
>> increasing the cluster size, because we can't create a savepoint with a
>> smaller cluster but on the other hand can't restore a checkpoint to a
>> bigger cluster, if rescaling from a checkpoint is not supposed to be relied
>> on. So in this case we're stuck and forced to start from an empty state?
>>
>> IMO there is a very good chance that this will simply become a normal
>> feature in the near future.
>>
>> Best,
>> Stefan
>>
>>
>

Re: Missing MapState when Timer fires after restored state

Posted by sihua zhou <su...@163.com>.
Hi Juho,
would you like to try out the latest RC(http://people.apache.org/~trohrmann/flink-1.5.0-rc4/) to rescaling the job from the "problematic" checkpoint? The latest RC includes a fix for the potential silently data lost. If it's the reason, you will see a different exception when you trying to recover you job.


Best, Sihua






On 05/18/2018 15:02,Juho Autio<ju...@rovio.com> wrote:
I see. I appreciate keeping this option available even if it's "beta". The current situation could be documented better, though.


As long as rescaling from checkpoint is not officially supported, I would put it behind a flag similar to --allowNonRestoredState. The flag could be called --allowRescalingRestoredCheckpointState, for example. This would make sure that users are aware that what they're using is experimental and might have unexpected effects.


As for the bug I faced, indeed I was able to reproduce it consistently. And I have provided TRACE-level logs personally to Stefan. If there is no Jira ticket for this yet, would you like me to create one?


On Thu, May 17, 2018 at 1:00 PM, Stefan Richter <s....@data-artisans.com> wrote:
Hi,

>
> This raises a couple of questions:
> - Is it a bug though, that the state restoring goes wrong like it does for my job? Based on my experience it seems like rescaling sometimes works, but then you can have these random errors.

If there is a problem, I would still consider it a bug because it should work correctly.

> - If it's not supported properly, why not refuse to restore a checkpoint if it would require rescaling?

It should work properly, but I would preferred to keep this at the level of a "hidden feature“ until it got some more exposure and also some questions about the future of differences between savepoints and checkpoints are solved.

> - We have sometimes had Flink jobs where the state has become so heavy that cancelling with a savepoint times out & fails. Incremental checkpoints are still working because they don't timeout as long as the state is growing linearly. In that case if we want to scale up (for example to enable successful savepoint creation ;) ), the only thing we can do is to restore from the latest checkpoint. But then we have no way to scale up by increasing the cluster size, because we can't create a savepoint with a smaller cluster but on the other hand can't restore a checkpoint to a bigger cluster, if rescaling from a checkpoint is not supposed to be relied on. So in this case we're stuck and forced to start from an empty state?

IMO there is a very good chance that this will simply become a normal feature in the near future.

Best,
Stefan




Re: Missing MapState when Timer fires after restored state

Posted by Juho Autio <ju...@rovio.com>.
> If you say that you can reproduce the problem, does that mean reproduce
from the single existing checkpoint

Yes.

I haven't tried creating another checkpoint and rescaling from it. I can
try that.

> We are including rescaling in some end-to-end tests now and then let’s
see what happens.

If I understood correctly, there is some difference in how timers & other
state are written. It might be interesting if you would include a test with
state that holds both timers and keyed MapState, like the code snippet in
my original message. I believe this is a common usage pattern any way. Test
should verify transactionality of restoring both timers & MapState
consistently.

On Fri, May 18, 2018 at 10:51 AM, Stefan Richter <
s.richter@data-artisans.com> wrote:

> Hi,
>
> I had a look at the logs from the restoring job and couldn’t find anything
> suspicious in them. Everything looks as expected and the state files are
> properly found and transferred from S3. We are including rescaling in some
> end-to-end tests now and then let’s see what happens.
> If you say that you can reproduce the problem, does that mean reproduce
> from the single existing checkpoint or also creating other problematic
> checkpoints? I am asking because maybe a log from the job that produces the
> problematic checkpoint might be more helpful. You can create a ticket if
> you want.
>
> Best,
> Stefan
>
>
> Am 18.05.2018 um 09:02 schrieb Juho Autio <ju...@rovio.com>:
>
> I see. I appreciate keeping this option available even if it's "beta". The
> current situation could be documented better, though.
>
> As long as rescaling from checkpoint is not officially supported, I would
> put it behind a flag similar to --allowNonRestoredState. The flag could be
> called --allowRescalingRestoredCheckpointState, for example. This would
> make sure that users are aware that what they're using is experimental and
> might have unexpected effects.
>
> As for the bug I faced, indeed I was able to reproduce it consistently.
> And I have provided TRACE-level logs personally to Stefan. If there is no
> Jira ticket for this yet, would you like me to create one?
>
> On Thu, May 17, 2018 at 1:00 PM, Stefan Richter <
> s.richter@data-artisans.com> wrote:
>
>> Hi,
>>
>> >
>> > This raises a couple of questions:
>> > - Is it a bug though, that the state restoring goes wrong like it does
>> for my job? Based on my experience it seems like rescaling sometimes works,
>> but then you can have these random errors.
>>
>> If there is a problem, I would still consider it a bug because it should
>> work correctly.
>>
>> > - If it's not supported properly, why not refuse to restore a
>> checkpoint if it would require rescaling?
>>
>> It should work properly, but I would preferred to keep this at the level
>> of a "hidden feature“ until it got some more exposure and also some
>> questions about the future of differences between savepoints and
>> checkpoints are solved.
>>
>> > - We have sometimes had Flink jobs where the state has become so heavy
>> that cancelling with a savepoint times out & fails. Incremental checkpoints
>> are still working because they don't timeout as long as the state is
>> growing linearly. In that case if we want to scale up (for example to
>> enable successful savepoint creation ;) ), the only thing we can do is to
>> restore from the latest checkpoint. But then we have no way to scale up by
>> increasing the cluster size, because we can't create a savepoint with a
>> smaller cluster but on the other hand can't restore a checkpoint to a
>> bigger cluster, if rescaling from a checkpoint is not supposed to be relied
>> on. So in this case we're stuck and forced to start from an empty state?
>>
>> IMO there is a very good chance that this will simply become a normal
>> feature in the near future.
>>
>> Best,
>> Stefan
>>
>>
>
>

Re: Missing MapState when Timer fires after restored state

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

I had a look at the logs from the restoring job and couldn’t find anything suspicious in them. Everything looks as expected and the state files are properly found and transferred from S3. We are including rescaling in some end-to-end tests now and then let’s see what happens. 
If you say that you can reproduce the problem, does that mean reproduce from the single existing checkpoint or also creating other problematic checkpoints? I am asking because maybe a log from the job that produces the problematic checkpoint might be more helpful. You can create a ticket if you want.

Best,
Stefan

> Am 18.05.2018 um 09:02 schrieb Juho Autio <ju...@rovio.com>:
> 
> I see. I appreciate keeping this option available even if it's "beta". The current situation could be documented better, though.
> 
> As long as rescaling from checkpoint is not officially supported, I would put it behind a flag similar to --allowNonRestoredState. The flag could be called --allowRescalingRestoredCheckpointState, for example. This would make sure that users are aware that what they're using is experimental and might have unexpected effects.
> 
> As for the bug I faced, indeed I was able to reproduce it consistently. And I have provided TRACE-level logs personally to Stefan. If there is no Jira ticket for this yet, would you like me to create one?
> 
> On Thu, May 17, 2018 at 1:00 PM, Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>> wrote:
> Hi,
> 
> > 
> > This raises a couple of questions:
> > - Is it a bug though, that the state restoring goes wrong like it does for my job? Based on my experience it seems like rescaling sometimes works, but then you can have these random errors.
> 
> If there is a problem, I would still consider it a bug because it should work correctly.
> 
> > - If it's not supported properly, why not refuse to restore a checkpoint if it would require rescaling?
> 
> It should work properly, but I would preferred to keep this at the level of a "hidden feature“ until it got some more exposure and also some questions about the future of differences between savepoints and checkpoints are solved. 
> 
> > - We have sometimes had Flink jobs where the state has become so heavy that cancelling with a savepoint times out & fails. Incremental checkpoints are still working because they don't timeout as long as the state is growing linearly. In that case if we want to scale up (for example to enable successful savepoint creation ;) ), the only thing we can do is to restore from the latest checkpoint. But then we have no way to scale up by increasing the cluster size, because we can't create a savepoint with a smaller cluster but on the other hand can't restore a checkpoint to a bigger cluster, if rescaling from a checkpoint is not supposed to be relied on. So in this case we're stuck and forced to start from an empty state?
> 
> IMO there is a very good chance that this will simply become a normal feature in the near future.
> 
> Best,
> Stefan
> 
> 


Re: Missing MapState when Timer fires after restored state

Posted by Juho Autio <ju...@rovio.com>.
I see. I appreciate keeping this option available even if it's "beta". The
current situation could be documented better, though.

As long as rescaling from checkpoint is not officially supported, I would
put it behind a flag similar to --allowNonRestoredState. The flag could be
called --allowRescalingRestoredCheckpointState, for example. This would
make sure that users are aware that what they're using is experimental and
might have unexpected effects.

As for the bug I faced, indeed I was able to reproduce it consistently. And
I have provided TRACE-level logs personally to Stefan. If there is no Jira
ticket for this yet, would you like me to create one?

On Thu, May 17, 2018 at 1:00 PM, Stefan Richter <s.richter@data-artisans.com
> wrote:

> Hi,
>
> >
> > This raises a couple of questions:
> > - Is it a bug though, that the state restoring goes wrong like it does
> for my job? Based on my experience it seems like rescaling sometimes works,
> but then you can have these random errors.
>
> If there is a problem, I would still consider it a bug because it should
> work correctly.
>
> > - If it's not supported properly, why not refuse to restore a checkpoint
> if it would require rescaling?
>
> It should work properly, but I would preferred to keep this at the level
> of a "hidden feature“ until it got some more exposure and also some
> questions about the future of differences between savepoints and
> checkpoints are solved.
>
> > - We have sometimes had Flink jobs where the state has become so heavy
> that cancelling with a savepoint times out & fails. Incremental checkpoints
> are still working because they don't timeout as long as the state is
> growing linearly. In that case if we want to scale up (for example to
> enable successful savepoint creation ;) ), the only thing we can do is to
> restore from the latest checkpoint. But then we have no way to scale up by
> increasing the cluster size, because we can't create a savepoint with a
> smaller cluster but on the other hand can't restore a checkpoint to a
> bigger cluster, if rescaling from a checkpoint is not supposed to be relied
> on. So in this case we're stuck and forced to start from an empty state?
>
> IMO there is a very good chance that this will simply become a normal
> feature in the near future.
>
> Best,
> Stefan
>
>

Re: Missing MapState when Timer fires after restored state

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

> 
> This raises a couple of questions:
> - Is it a bug though, that the state restoring goes wrong like it does for my job? Based on my experience it seems like rescaling sometimes works, but then you can have these random errors.

If there is a problem, I would still consider it a bug because it should work correctly.

> - If it's not supported properly, why not refuse to restore a checkpoint if it would require rescaling?

It should work properly, but I would preferred to keep this at the level of a "hidden feature“ until it got some more exposure and also some questions about the future of differences between savepoints and checkpoints are solved. 

> - We have sometimes had Flink jobs where the state has become so heavy that cancelling with a savepoint times out & fails. Incremental checkpoints are still working because they don't timeout as long as the state is growing linearly. In that case if we want to scale up (for example to enable successful savepoint creation ;) ), the only thing we can do is to restore from the latest checkpoint. But then we have no way to scale up by increasing the cluster size, because we can't create a savepoint with a smaller cluster but on the other hand can't restore a checkpoint to a bigger cluster, if rescaling from a checkpoint is not supposed to be relied on. So in this case we're stuck and forced to start from an empty state?

IMO there is a very good chance that this will simply become a normal feature in the near future.

Best,
Stefan


Re: Missing MapState when Timer fires after restored state

Posted by sihua zhou <su...@163.com>.
Hi Juho,
I'm agree with you that sometimes it seems like a nightmare to create a savepoint for a state heavy job, it's also the reason that we use the checkpoint to recover the job. In fact, in our cases, it often takes more that 10min to take a savepoint successfully...Even though, we didn't meet the problem like yours when restoring from checkpoint, but if recovery from checkpoint could be guaranteed to support rescaling that would be helpful definitely. I think maybe we could fire a DISCUSSION on the dev mail about this once 1.5 is releasing out, I think the community is very busy with the releasing works currently. 


Best, Sihua


On 05/16/2018 17:54,Juho Autio<ju...@rovio.com> wrote:
Thanks Sihua.


Stefan wrote: "we do not want that user rely on their checkpoints to be rescalable" (https://github.com/apache/flink/pull/5490#issuecomment-365887734)



This raises a couple of questions:
- Is it a bug though, that the state restoring goes wrong like it does for my job? Based on my experience it seems like rescaling sometimes works, but then you can have these random errors.
- If it's not supported properly, why not refuse to restore a checkpoint if it would require rescaling?
- We have sometimes had Flink jobs where the state has become so heavy that cancelling with a savepoint times out & fails. Incremental checkpoints are still working because they don't timeout as long as the state is growing linearly. In that case if we want to scale up (for example to enable successful savepoint creation ;) ), the only thing we can do is to restore from the latest checkpoint. But then we have no way to scale up by increasing the cluster size, because we can't create a savepoint with a smaller cluster but on the other hand can't restore a checkpoint to a bigger cluster, if rescaling from a checkpoint is not supposed to be relied on. So in this case we're stuck and forced to start from an empty state?


To us it makes Flink job maintenance in production much easier if we can simply scale up from a restored checkpoint without worrying about if we have a recent enough savepoint available – which we quite often may not have especially when there's a problem that requires upscaling.


On Wed, May 16, 2018 at 12:30 PM, sihua zhou <su...@163.com> wrote:

Hi, Juho


> If restoring + rescaling a checkpoint is not supported properly, I don't understand why Flink doesn't entirely refuse to restore in that case?


I think you're asking the question I have asked in https://github.com/apache/flink/pull/5490, you can refer to it and find the comments there.


@Stefan, PR(https://github.com/apache/flink/pull/6020) has been prepared.


Best, Sihua




On 05/16/2018 17:20,Juho Autio<ju...@rovio.com> wrote:
Yes, I'm rescaling from a checkpoint.


> that behavior is not guaranteed yet


If restoring + rescaling a checkpoint is not supported properly, I don't understand why Flink doesn't entirely refuse to restore in that case?



Note that I'm using a rather new 1.5-SNAPSHOT, not release 1.4. To be exact, the package was built at flink commit 8395508b0401353ed07375e22882e7581d46ac0e.



I also made this additional test:
- launch again from the checkpoint with the original parallelism=8
- cancel with savepoint (this was after ~7 minutes, during which the job also created some new incremental checkpoints
- restore the newly created savepoint with parallelism=16
- it works


Now I have restored from the original checkpoint multiple times and it consistently fails with that java.lang.NumberFormatException when trying with parallelism=18.


I was able to enable trace-level logs with this change (nothing changed in logback.xml):


Added these lines to flink-1.5-SNAPSHOT/conf/log4j.properties:



# To debug checkpoint restoring
log4j.logger.org.apache.flink.contrib.streaming.state=TRACE
log4j.logger.org.apache.flink.runtime.state=TRACE
log4j.logger.org.apache.flink.runtime.checkpoint=TRACE
log4j.logger.org.apache.flink.streaming.api.operators=TRACE
log4j.logger.org.apache.flink.streaming.runtime.tasks=TRACE


(This is confusing to me – I had understood that Flink would have migrated entirely to use logback configuration instead of log4j?)


I modified the logs to remove sensitive information, but because I'm not 100% sure that I caught everything, I will share the logs personally with Stefan only.


On Wed, May 16, 2018 at 5:22 AM, sihua zhou <su...@163.com> wrote:

Hi Juho,
if I'm not misunderstand, you saied your're rescaling the job from the checkpoint? If yes, I think that behavior is not guaranteed yet, you can find this on the doc https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/checkpoints.html#difference-to-savepoints. So, I not sure whether this is a "bug" at current stage(personally I'd like to dig it out because currently we also use the checkpoint like the way you are) ...


Best, Sihua


On 05/16/2018 01:46,Juho Autio<ju...@rovio.com> wrote:
I was able to reproduce this error.


I just happened to notice an important detail about the original failure:

- checkpoint was created with a 1-node cluster (parallelism=8)
- restored on a 2-node cluster (parallelism=16), caused that null exception



I tried restoring again from the problematic checkpoint again

- restored on a 1-node cluster, no problems
- restored on a 2-node cluster, getting the original error!


So now I have a way to reproduce the bug. To me it seems like the checkpoint itself is fine. The bug seems to be in redistributing the state of a restored checkpoint to a higher parallelism. I only tested each cluster size once (as described above) so it could also be coincidence, but seems at least likely now that it's about the state redistribution.


I'll try to follow up with those TRACE-level logs tomorrow. Today I tried adding these to the logback.xml, but I didn't get anything else but INFO level logs:



    <logger name="org.apache.flink.contrib.streaming.state" level="TRACE">
        <appender-ref ref="file"/>
    </logger>
    <logger name="org.apache.flink.runtime.state" level="TRACE">
        <appender-ref ref="file"/>
    </logger>
    <logger name="org.apache.flink.runtime.checkpoint" level="TRACE">
        <appender-ref ref="file"/>
    </logger>
    <logger name="org.apache.flink.streaming.api.operators" level="TRACE">
        <appender-ref ref="file"/>
    </logger>
    <logger name="org.apache.flink.streaming.runtime.tasks" level="TRACE">
        <appender-ref ref="file"/>
    </logger>


Maybe I need to edit the log4j.properties instead(?). Indeed it's Flink 1.5-SNAPSHOT and the package has all of these in the conf/ dir:


log4j-cli.properties
log4j-console.properties
log4j.properties
log4j-yarn-session.properties
logback-console.xml
logback.xml
logback-yarn.xml


On Tue, May 15, 2018 at 11:49 AM, Stefan Richter <s....@data-artisans.com> wrote:

Hi,



Am 15.05.2018 um 10:34 schrieb Juho Autio <ju...@rovio.com>:


Ok, that should be possible to provide. Are there any specific packages to set on trace level? Maybe just go with org.apache.flink.* on TRACE?


The following packages would be helpful:


org.apache.flink.contrib.streaming.state.*
org.apache.flink.runtime.state.*
org.apache.flink.runtime.checkpoint.*
org.apache.flink.streaming.api.operators.*
org.apache.flink.streaming.runtime.tasks.*




> did the „too many open files“ problem only happen with local recovery (asking since it should actually not add the the amount of open files)


I think it happened in various places, maybe not when restoring.. Any way if the situation is like that, the system is pretty much unusable (on OS level), so it shouldn't matter too much which operation of the application it causes to fail? Any way I'll try to grab & share all log lines that say "Too Many Open Files"..


> and did you deactivate it on the second cluster for the restart or changed your OS settings?



No, didn't change anything except for increasing the ulimit on OS to prevent this from happening again. Note that the system only ran out of files after ~11 days of uptime. During that time there had been some local recoveries. This makes me wonder though, could it be that many local recoveries eventually caused this – could it be that in the occasion of local recovery some "old" files are left open, making the system eventually run out of files?




From the way how local recovery works with incremental RocksDB checkpoints, I would not assume that it is the cause of the problem. In this particular case, the number of opened files on a local FS should not be higher than the number without local recovery. Maybe it is just a matter of the OS limit and the number of operators with a RocksDB backend running on the machine and the amount of files managed by all those RocksDB instances that simply exceed the limit. If you have an overview how many parallel operator instances with keyed state were running on the machine and assume some reasonable number of files per RocksDB instance and the limit configured in your OS, could that be the case?




Thanks!


Thanks for your help!




On Tue, May 15, 2018 at 11:17 AM, Stefan Richter <s....@data-artisans.com> wrote:

Btw having a trace level log of a restart from a problematic checkpoint could actually be helpful if we cannot find the problem from the previous points. This can give a more detailed view of what checkpoint files are mapped to which operator.


I am having one more question: did the „too many open files“ problem only happen with local recovery (asking since it should actually not add the the amount of open files), and did you deactivate it on the second cluster for the restart or changed your OS settings?




Am 15.05.2018 um 10:09 schrieb Stefan Richter <s....@data-artisans.com>:


What I would like to see from the logs is (also depending a bit on your log level):


- all exceptions.
- in which context exactly the „too many open files“ problem occurred, because I think for checkpoint consistency it should not matter as a checkpoint with such a problem should never succeed.
- files that are written for checkpoints/savepoints.
- completed checkpoints/savepoints ids.
- the restored checkpoint/savepoint id.
- files that are loaded on restore.


Am 15.05.2018 um 10:02 schrieb Juho Autio <ju...@rovio.com>:


Thanks all. I'll have to see about sharing the logs & configuration..


Is there something special that you'd like to see from the logs? It may be easier for me to get specific lines and obfuscate sensitive information instead of trying to do that for the full logs.



We basically have: RocksDBStateBackend with enableIncrementalCheckpointing=true, external state path on s3.



The code that we use is:


            env.setStateBackend(getStateBackend(statePath, new RocksDBStateBackend(statePath, true)));
            env.getCheckpointConfig().setMinPauseBetweenCheckpoints(params.getLong("checkpoint.minPause", 60 * 1000));
            env.getCheckpointConfig().setMaxConcurrentCheckpoints(params.getInt("checkpoint.maxConcurrent", 1));
            env.getCheckpointConfig().setCheckpointTimeout(params.getLong("checkpoint.timeout", 10 * 60 * 1000));
            env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


The problematic state that we tried to use was a checkpoint created with this conf.



> Are you using the local recovery feature?


Yes, and in this particular case the job was constantly failing/restarting because of Too Many Open Files. So we terminated the cluster entirely, created a new one, and launched a new job by specifying the latest checkpoint path to restore state from.


This is the only time I have seen this error happen with timer state. I still have that bad checkpoint data on s3, so I might be able to try to restore it again if needed to debug it. But that would require some tweaking, because I don't want to tangle with the same kafka consumer group offsets or send old data again to production endpoint.


Please keep in mind that there was that Too Many Open Files issue on the cluster that created the problematic checkpoint, if you think that's relevant.


On Tue, May 15, 2018 at 10:39 AM, Stefan Richter <s....@data-artisans.com> wrote:

Hi,


I agree, this looks like a bug. Can you tell us your exact configuration of the state backend, e.g. if you are using incremental checkpoints or not. Are you using the local recovery feature? Are you restarting the job from a checkpoint or a savepoint? Can you provide logs for both the job that failed and the restarted job?


Best,
Stefan




Am 14.05.2018 um 13:00 schrieb Juho Autio <ju...@rovio.com>:


We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old state. After restoring state from a checkpoint, it seems like a timer had been restored, but not the data that was expected to be in a related MapState if such timer has been added.



The way I see this is that there's a bug, either of these:
- The writing of timers & map states to Flink state is not synchronized (or maybe there are no such guarantees by design?)
- Flink may restore a checkpoint that is actually corrupted/incomplete


Our code (simplified):


    private MapState<String, String> mapState;


    public void processElement(..) {
            mapState.put("lastUpdated", ctx.timestamp().toString());
            ctx.timerService().registerEventTimeTimer(ctx.timestamp() + stateRetentionMillis);
    }


    public void onTimer(long timestamp, OnTimerContext ctx, ..) {

        long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
        if (timestamp >= lastUpdated + stateRetentionMillis) {
            mapState.clear();
        }
    }


Normally this "just works". As you can see, it shouldn't be possible that "lastUpdated" doesn't exist in state if timer was registered and onTimer gets called.


However, after restoring state from a checkpoint, the job kept failing with this error:


Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
..


So apparently onTimer was called but lastUpdated wasn't found in the MapState.


The background for restoring state in this case is not entirely clean. There was an OS level issue "Too many open files" after running a job for ~11 days. To fix that, we replaced the cluster with a new one and launched the Flink job again. State was successfully restored from the latest checkpoint that had been created by the "problematic execution". Now, I'm assuming that if the state wouldn't have been created successfully, restoring wouldn't succeed either – correct? This is just to rule out that the issue with state didn't happen because the checkpoint files were somehow corrupted due to the Too many open files problem.



Thank you all for your continued support!


P.S. I would be very much interested to hear if there's some cleaner way to achieve this kind of TTL for keyed state in Flink.
























Re: Missing MapState when Timer fires after restored state

Posted by Juho Autio <ju...@rovio.com>.
Thanks Sihua.

Stefan wrote: "we do not want that user rely on their checkpoints to be
rescalable" (
https://github.com/apache/flink/pull/5490#issuecomment-365887734)

This raises a couple of questions:
- Is it a bug though, that the state restoring goes wrong like it does for
my job? Based on my experience it seems like rescaling sometimes works, but
then you can have these random errors.
- If it's not supported properly, why not refuse to restore a checkpoint if
it would require rescaling?
- We have sometimes had Flink jobs where the state has become so heavy that
cancelling with a savepoint times out & fails. Incremental checkpoints are
still working because they don't timeout as long as the state is growing
linearly. In that case if we want to scale up (for example to enable
successful savepoint creation ;) ), the only thing we can do is to restore
from the latest checkpoint. But then we have no way to scale up by
increasing the cluster size, because we can't create a savepoint with a
smaller cluster but on the other hand can't restore a checkpoint to a
bigger cluster, if rescaling from a checkpoint is not supposed to be relied
on. So in this case we're stuck and forced to start from an empty state?

To us it makes Flink job maintenance in production much easier if we can
simply scale up from a restored checkpoint without worrying about if we
have a recent enough savepoint available – which we quite often may not
have especially when there's a problem that requires upscaling.

On Wed, May 16, 2018 at 12:30 PM, sihua zhou <su...@163.com> wrote:

> Hi, Juho
>
> > If restoring + rescaling a checkpoint is not supported properly, I don't
> understand why Flink doesn't entirely refuse to restore in that case?
>
> I think you're asking the question I have asked in
> https://github.com/apache/flink/pull/5490, you can refer to it and find
> the comments there.
>
> @Stefan, PR(https://github.com/apache/flink/pull/6020) has been prepared.
>
> Best, Sihua
>
>
> On 05/16/2018 17:20,Juho Autio<ju...@rovio.com>
> <ju...@rovio.com> wrote:
>
> Yes, I'm rescaling from a checkpoint.
>
> > that behavior is not guaranteed yet
>
> If restoring + rescaling a checkpoint is not supported properly, I don't
> understand why Flink doesn't entirely refuse to restore in that case?
>
> Note that I'm using a rather new 1.5-SNAPSHOT, not release 1.4. To be
> exact, the package was built at flink commit 8395508b0401353ed07375e22882e7
> 581d46ac0e.
>
> I also made this additional test:
> - launch again from the checkpoint with the original parallelism=8
> - cancel with savepoint (this was after ~7 minutes, during which the job
> also created some new incremental checkpoints
> - restore the newly created savepoint with parallelism=16
> - it works
>
> Now I have restored from the original checkpoint multiple times and it
> consistently fails with that java.lang.NumberFormatException when trying
> with parallelism=18.
>
> I was able to enable trace-level logs with this change (nothing changed in
> logback.xml):
>
> Added these lines to flink-1.5-SNAPSHOT/conf/log4j.properties:
>
> # To debug checkpoint restoring
> log4j.logger.org.apache.flink.contrib.streaming.state=TRACE
> log4j.logger.org.apache.flink.runtime.state=TRACE
> log4j.logger.org.apache.flink.runtime.checkpoint=TRACE
> log4j.logger.org.apache.flink.streaming.api.operators=TRACE
> log4j.logger.org.apache.flink.streaming.runtime.tasks=TRACE
>
> (This is confusing to me – I had understood that Flink would have migrated
> entirely to use logback configuration instead of log4j?)
>
> I modified the logs to remove sensitive information, but because I'm not
> 100% sure that I caught everything, I will share the logs personally with
> Stefan only.
>
> On Wed, May 16, 2018 at 5:22 AM, sihua zhou <su...@163.com> wrote:
>
>> Hi Juho,
>> if I'm not misunderstand, you saied your're rescaling the job from the
>> checkpoint? If yes, I think that behavior is not guaranteed yet, you can
>> find this on the doc https://ci.apache.org/proj
>> ects/flink/flink-docs-release-1.4/ops/state/checkpoints.html
>> #difference-to-savepoints. So, I not sure whether this is a "bug" at
>> current stage(personally I'd like to dig it out because currently we also
>> use the checkpoint like the way you are) ...
>>
>> Best, Sihua
>>
>> On 05/16/2018 01:46,Juho Autio<ju...@rovio.com>
>> <ju...@rovio.com> wrote:
>>
>> I was able to reproduce this error.
>>
>> I just happened to notice an important detail about the original failure:
>> - checkpoint was created with a 1-node cluster (parallelism=8)
>> - restored on a 2-node cluster (parallelism=16), caused that null
>> exception
>>
>> I tried restoring again from the problematic checkpoint again
>> - restored on a 1-node cluster, no problems
>> - restored on a 2-node cluster, getting the original error!
>>
>> So now I have a way to reproduce the bug. To me it seems like the
>> checkpoint itself is fine. The bug seems to be in redistributing the state
>> of a restored checkpoint to a higher parallelism. I only tested each
>> cluster size once (as described above) so it could also be coincidence, but
>> seems at least likely now that it's about the state redistribution.
>>
>> I'll try to follow up with those TRACE-level logs tomorrow. Today I tried
>> adding these to the logback.xml, but I didn't get anything else but INFO
>> level logs:
>>
>>     <logger name="org.apache.flink.contrib.streaming.state"
>> level="TRACE">
>>         <appender-ref ref="file"/>
>>     </logger>
>>     <logger name="org.apache.flink.runtime.state" level="TRACE">
>>         <appender-ref ref="file"/>
>>     </logger>
>>     <logger name="org.apache.flink.runtime.checkpoint" level="TRACE">
>>         <appender-ref ref="file"/>
>>     </logger>
>>     <logger name="org.apache.flink.streaming.api.operators"
>> level="TRACE">
>>         <appender-ref ref="file"/>
>>     </logger>
>>     <logger name="org.apache.flink.streaming.runtime.tasks"
>> level="TRACE">
>>         <appender-ref ref="file"/>
>>     </logger>
>>
>> Maybe I need to edit the log4j.properties instead(?). Indeed it's Flink
>> 1.5-SNAPSHOT and the package has all of these in the conf/ dir:
>>
>> log4j-cli.properties
>> log4j-console.properties
>> log4j.properties
>> log4j-yarn-session.properties
>> logback-console.xml
>> logback.xml
>> logback-yarn.xml
>>
>> On Tue, May 15, 2018 at 11:49 AM, Stefan Richter <
>> s.richter@data-artisans.com> wrote:
>>
>>> Hi,
>>>
>>> Am 15.05.2018 um 10:34 schrieb Juho Autio <ju...@rovio.com>:
>>>
>>> Ok, that should be possible to provide. Are there any specific packages
>>> to set on trace level? Maybe just go with org.apache.flink.* on TRACE?
>>>
>>>
>>> The following packages would be helpful:
>>>
>>> org.apache.flink.contrib.streaming.state.*
>>> org.apache.flink.runtime.state.*
>>> org.apache.flink.runtime.checkpoint.*
>>> org.apache.flink.streaming.api.operators.*
>>> org.apache.flink.streaming.runtime.tasks.*
>>>
>>>
>>> > did the „too many open files“ problem only happen with local recovery
>>> (asking since it should actually not add the the amount of open files)
>>>
>>> I think it happened in various places, maybe not when restoring.. Any
>>> way if the situation is like that, the system is pretty much unusable (on
>>> OS level), so it shouldn't matter too much which operation of the
>>> application it causes to fail? Any way I'll try to grab & share all log
>>> lines that say "Too Many Open Files"..
>>>
>>> > and did you deactivate it on the second cluster for the restart or
>>> changed your OS settings?
>>>
>>> No, didn't change anything except for increasing the ulimit on OS to
>>> prevent this from happening again. Note that the system only ran out of
>>> files after ~11 days of uptime. During that time there had been some local
>>> recoveries. This makes me wonder though, could it be that many local
>>> recoveries eventually caused this – could it be that in the occasion of
>>> local recovery some "old" files are left open, making the system eventually
>>> run out of files?
>>>
>>>
>>>
>>> From the way how local recovery works with incremental RocksDB
>>> checkpoints, I would not assume that it is the cause of the problem. In
>>> this particular case, the number of opened files on a local FS should not
>>> be higher than the number without local recovery. Maybe it is just a matter
>>> of the OS limit and the number of operators with a RocksDB backend running
>>> on the machine and the amount of files managed by all those RocksDB
>>> instances that simply exceed the limit. If you have an overview how many
>>> parallel operator instances with keyed state were running on the machine
>>> and assume some reasonable number of files per RocksDB instance and the
>>> limit configured in your OS, could that be the case?
>>>
>>>
>>> Thanks!
>>>
>>>
>>> Thanks for your help!
>>>
>>>
>>> On Tue, May 15, 2018 at 11:17 AM, Stefan Richter <
>>> s.richter@data-artisans.com> wrote:
>>>
>>>> Btw having a trace level log of a restart from a problematic checkpoint
>>>> could actually be helpful if we cannot find the problem from the previous
>>>> points. This can give a more detailed view of what checkpoint files are
>>>> mapped to which operator.
>>>>
>>>> I am having one more question: did the „too many open files“ problem
>>>> only happen with local recovery (asking since it should actually not add
>>>> the the amount of open files), and did you deactivate it on the second
>>>> cluster for the restart or changed your OS settings?
>>>>
>>>>
>>>> Am 15.05.2018 um 10:09 schrieb Stefan Richter <
>>>> s.richter@data-artisans.com>:
>>>>
>>>> What I would like to see from the logs is (also depending a bit on your
>>>> log level):
>>>>
>>>> - all exceptions.
>>>> - in which context exactly the „too many open files“ problem occurred,
>>>> because I think for checkpoint consistency it should not matter as a
>>>> checkpoint with such a problem should never succeed.
>>>> - files that are written for checkpoints/savepoints.
>>>> - completed checkpoints/savepoints ids.
>>>> - the restored checkpoint/savepoint id.
>>>> - files that are loaded on restore.
>>>>
>>>> Am 15.05.2018 um 10:02 schrieb Juho Autio <ju...@rovio.com>:
>>>>
>>>> Thanks all. I'll have to see about sharing the logs & configuration..
>>>>
>>>> Is there something special that you'd like to see from the logs? It may
>>>> be easier for me to get specific lines and obfuscate sensitive information
>>>> instead of trying to do that for the full logs.
>>>>
>>>> We basically have: RocksDBStateBackend with enableIncrementalCheckpointing=true,
>>>> external state path on s3.
>>>>
>>>> The code that we use is:
>>>>
>>>>             env.setStateBackend(getStateBackend(statePath, new
>>>> RocksDBStateBackend(statePath, true)));
>>>>             env.getCheckpointConfig().setM
>>>> inPauseBetweenCheckpoints(params.getLong("checkpoint.minPause", 60 *
>>>> 1000));
>>>>             env.getCheckpointConfig().setM
>>>> axConcurrentCheckpoints(params.getInt("checkpoint.maxConcurrent", 1));
>>>>             env.getCheckpointConfig().setC
>>>> heckpointTimeout(params.getLong("checkpoint.timeout", 10 * 60 * 1000));
>>>>             env.getCheckpointConfig().enab
>>>> leExternalizedCheckpoints(CheckpointConfig.ExternalizedCheck
>>>> pointCleanup.RETAIN_ON_CANCELLATION);
>>>>
>>>> The problematic state that we tried to use was a checkpoint created
>>>> with this conf.
>>>>
>>>> > Are you using the local recovery feature?
>>>>
>>>> Yes, and in this particular case the job was constantly
>>>> failing/restarting because of Too Many Open Files. So we terminated the
>>>> cluster entirely, created a new one, and launched a new job by specifying
>>>> the latest checkpoint path to restore state from.
>>>>
>>>> This is the only time I have seen this error happen with timer state. I
>>>> still have that bad checkpoint data on s3, so I might be able to try to
>>>> restore it again if needed to debug it. But that would require some
>>>> tweaking, because I don't want to tangle with the same kafka consumer group
>>>> offsets or send old data again to production endpoint.
>>>>
>>>> Please keep in mind that there was that Too Many Open Files issue on
>>>> the cluster that created the problematic checkpoint, if you think that's
>>>> relevant.
>>>>
>>>> On Tue, May 15, 2018 at 10:39 AM, Stefan Richter <
>>>> s.richter@data-artisans.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I agree, this looks like a bug. Can you tell us your exact
>>>>> configuration of the state backend, e.g. if you are using incremental
>>>>> checkpoints or not. Are you using the local recovery feature? Are you
>>>>> restarting the job from a checkpoint or a savepoint? Can you provide logs
>>>>> for both the job that failed and the restarted job?
>>>>>
>>>>> Best,
>>>>> Stefan
>>>>>
>>>>>
>>>>> Am 14.05.2018 um 13:00 schrieb Juho Autio <ju...@rovio.com>:
>>>>>
>>>>> We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear
>>>>> old state. After restoring state from a checkpoint, it seems like a timer
>>>>> had been restored, but not the data that was expected to be in a related
>>>>> MapState if such timer has been added.
>>>>>
>>>>> The way I see this is that there's a bug, either of these:
>>>>> - The writing of timers & map states to Flink state is not
>>>>> synchronized (or maybe there are no such guarantees by design?)
>>>>> - Flink may restore a checkpoint that is actually corrupted/incomplete
>>>>>
>>>>> Our code (simplified):
>>>>>
>>>>>     private MapState<String, String> mapState;
>>>>>
>>>>>     public void processElement(..) {
>>>>>             mapState.put("lastUpdated", ctx.timestamp().toString());
>>>>>             ctx.timerService().registerEventTimeTimer(ctx.timestamp()
>>>>> + stateRetentionMillis);
>>>>>     }
>>>>>
>>>>>     public void onTimer(long timestamp, OnTimerContext ctx, ..) {
>>>>>         long lastUpdated = Long.parseLong(mapState.get("l
>>>>> astUpdated"));
>>>>>         if (timestamp >= lastUpdated + stateRetentionMillis) {
>>>>>             mapState.clear();
>>>>>         }
>>>>>     }
>>>>>
>>>>> Normally this "just works". As you can see, it shouldn't be possible
>>>>> that "lastUpdated" doesn't exist in state if timer was registered and
>>>>> onTimer gets called.
>>>>>
>>>>> However, after restoring state from a checkpoint, the job kept failing
>>>>> with this error:
>>>>>
>>>>> Caused by: java.lang.NumberFormatException: null
>>>>> at java.lang.Long.parseLong(Long.java:552)
>>>>> at java.lang.Long.parseLong(Long.java:631)
>>>>> at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunct
>>>>> ion.java:136)
>>>>> ..
>>>>>
>>>>> So apparently onTimer was called but lastUpdated wasn't found in the
>>>>> MapState.
>>>>>
>>>>> The background for restoring state in this case is not entirely clean.
>>>>> There was an OS level issue "Too many open files" after running a job for
>>>>> ~11 days. To fix that, we replaced the cluster with a new one and launched
>>>>> the Flink job again. State was successfully restored from the latest
>>>>> checkpoint that had been created by the "problematic execution". Now, I'm
>>>>> assuming that if the state wouldn't have been created successfully,
>>>>> restoring wouldn't succeed either – correct? This is just to rule out that
>>>>> the issue with state didn't happen because the checkpoint files were
>>>>> somehow corrupted due to the Too many open files problem.
>>>>>
>>>>> Thank you all for your continued support!
>>>>>
>>>>> P.S. I would be very much interested to hear if there's some cleaner
>>>>> way to achieve this kind of TTL for keyed state in Flink.
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>>
>>
>>
>

Re: Missing MapState when Timer fires after restored state

Posted by sihua zhou <su...@163.com>.
Hi, Juho


> If restoring + rescaling a checkpoint is not supported properly, I don't understand why Flink doesn't entirely refuse to restore in that case?


I think you're asking the question I have asked in https://github.com/apache/flink/pull/5490, you can refer to it and find the comments there.


@Stefan, PR(https://github.com/apache/flink/pull/6020) has been prepared.


Best, Sihua




On 05/16/2018 17:20,Juho Autio<ju...@rovio.com> wrote:
Yes, I'm rescaling from a checkpoint.


> that behavior is not guaranteed yet


If restoring + rescaling a checkpoint is not supported properly, I don't understand why Flink doesn't entirely refuse to restore in that case?



Note that I'm using a rather new 1.5-SNAPSHOT, not release 1.4. To be exact, the package was built at flink commit 8395508b0401353ed07375e22882e7581d46ac0e.



I also made this additional test:
- launch again from the checkpoint with the original parallelism=8
- cancel with savepoint (this was after ~7 minutes, during which the job also created some new incremental checkpoints
- restore the newly created savepoint with parallelism=16
- it works


Now I have restored from the original checkpoint multiple times and it consistently fails with that java.lang.NumberFormatException when trying with parallelism=18.


I was able to enable trace-level logs with this change (nothing changed in logback.xml):


Added these lines to flink-1.5-SNAPSHOT/conf/log4j.properties:



# To debug checkpoint restoring
log4j.logger.org.apache.flink.contrib.streaming.state=TRACE
log4j.logger.org.apache.flink.runtime.state=TRACE
log4j.logger.org.apache.flink.runtime.checkpoint=TRACE
log4j.logger.org.apache.flink.streaming.api.operators=TRACE
log4j.logger.org.apache.flink.streaming.runtime.tasks=TRACE


(This is confusing to me – I had understood that Flink would have migrated entirely to use logback configuration instead of log4j?)


I modified the logs to remove sensitive information, but because I'm not 100% sure that I caught everything, I will share the logs personally with Stefan only.


On Wed, May 16, 2018 at 5:22 AM, sihua zhou <su...@163.com> wrote:

Hi Juho,
if I'm not misunderstand, you saied your're rescaling the job from the checkpoint? If yes, I think that behavior is not guaranteed yet, you can find this on the doc https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/checkpoints.html#difference-to-savepoints. So, I not sure whether this is a "bug" at current stage(personally I'd like to dig it out because currently we also use the checkpoint like the way you are) ...


Best, Sihua


On 05/16/2018 01:46,Juho Autio<ju...@rovio.com> wrote:
I was able to reproduce this error.


I just happened to notice an important detail about the original failure:

- checkpoint was created with a 1-node cluster (parallelism=8)
- restored on a 2-node cluster (parallelism=16), caused that null exception



I tried restoring again from the problematic checkpoint again

- restored on a 1-node cluster, no problems
- restored on a 2-node cluster, getting the original error!


So now I have a way to reproduce the bug. To me it seems like the checkpoint itself is fine. The bug seems to be in redistributing the state of a restored checkpoint to a higher parallelism. I only tested each cluster size once (as described above) so it could also be coincidence, but seems at least likely now that it's about the state redistribution.


I'll try to follow up with those TRACE-level logs tomorrow. Today I tried adding these to the logback.xml, but I didn't get anything else but INFO level logs:



    <logger name="org.apache.flink.contrib.streaming.state" level="TRACE">
        <appender-ref ref="file"/>
    </logger>
    <logger name="org.apache.flink.runtime.state" level="TRACE">
        <appender-ref ref="file"/>
    </logger>
    <logger name="org.apache.flink.runtime.checkpoint" level="TRACE">
        <appender-ref ref="file"/>
    </logger>
    <logger name="org.apache.flink.streaming.api.operators" level="TRACE">
        <appender-ref ref="file"/>
    </logger>
    <logger name="org.apache.flink.streaming.runtime.tasks" level="TRACE">
        <appender-ref ref="file"/>
    </logger>


Maybe I need to edit the log4j.properties instead(?). Indeed it's Flink 1.5-SNAPSHOT and the package has all of these in the conf/ dir:


log4j-cli.properties
log4j-console.properties
log4j.properties
log4j-yarn-session.properties
logback-console.xml
logback.xml
logback-yarn.xml


On Tue, May 15, 2018 at 11:49 AM, Stefan Richter <s....@data-artisans.com> wrote:

Hi,



Am 15.05.2018 um 10:34 schrieb Juho Autio <ju...@rovio.com>:


Ok, that should be possible to provide. Are there any specific packages to set on trace level? Maybe just go with org.apache.flink.* on TRACE?


The following packages would be helpful:


org.apache.flink.contrib.streaming.state.*
org.apache.flink.runtime.state.*
org.apache.flink.runtime.checkpoint.*
org.apache.flink.streaming.api.operators.*
org.apache.flink.streaming.runtime.tasks.*




> did the „too many open files“ problem only happen with local recovery (asking since it should actually not add the the amount of open files)


I think it happened in various places, maybe not when restoring.. Any way if the situation is like that, the system is pretty much unusable (on OS level), so it shouldn't matter too much which operation of the application it causes to fail? Any way I'll try to grab & share all log lines that say "Too Many Open Files"..


> and did you deactivate it on the second cluster for the restart or changed your OS settings?



No, didn't change anything except for increasing the ulimit on OS to prevent this from happening again. Note that the system only ran out of files after ~11 days of uptime. During that time there had been some local recoveries. This makes me wonder though, could it be that many local recoveries eventually caused this – could it be that in the occasion of local recovery some "old" files are left open, making the system eventually run out of files?




From the way how local recovery works with incremental RocksDB checkpoints, I would not assume that it is the cause of the problem. In this particular case, the number of opened files on a local FS should not be higher than the number without local recovery. Maybe it is just a matter of the OS limit and the number of operators with a RocksDB backend running on the machine and the amount of files managed by all those RocksDB instances that simply exceed the limit. If you have an overview how many parallel operator instances with keyed state were running on the machine and assume some reasonable number of files per RocksDB instance and the limit configured in your OS, could that be the case?




Thanks!


Thanks for your help!




On Tue, May 15, 2018 at 11:17 AM, Stefan Richter <s....@data-artisans.com> wrote:

Btw having a trace level log of a restart from a problematic checkpoint could actually be helpful if we cannot find the problem from the previous points. This can give a more detailed view of what checkpoint files are mapped to which operator.


I am having one more question: did the „too many open files“ problem only happen with local recovery (asking since it should actually not add the the amount of open files), and did you deactivate it on the second cluster for the restart or changed your OS settings?




Am 15.05.2018 um 10:09 schrieb Stefan Richter <s....@data-artisans.com>:


What I would like to see from the logs is (also depending a bit on your log level):


- all exceptions.
- in which context exactly the „too many open files“ problem occurred, because I think for checkpoint consistency it should not matter as a checkpoint with such a problem should never succeed.
- files that are written for checkpoints/savepoints.
- completed checkpoints/savepoints ids.
- the restored checkpoint/savepoint id.
- files that are loaded on restore.


Am 15.05.2018 um 10:02 schrieb Juho Autio <ju...@rovio.com>:


Thanks all. I'll have to see about sharing the logs & configuration..


Is there something special that you'd like to see from the logs? It may be easier for me to get specific lines and obfuscate sensitive information instead of trying to do that for the full logs.



We basically have: RocksDBStateBackend with enableIncrementalCheckpointing=true, external state path on s3.



The code that we use is:


            env.setStateBackend(getStateBackend(statePath, new RocksDBStateBackend(statePath, true)));
            env.getCheckpointConfig().setMinPauseBetweenCheckpoints(params.getLong("checkpoint.minPause", 60 * 1000));
            env.getCheckpointConfig().setMaxConcurrentCheckpoints(params.getInt("checkpoint.maxConcurrent", 1));
            env.getCheckpointConfig().setCheckpointTimeout(params.getLong("checkpoint.timeout", 10 * 60 * 1000));
            env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


The problematic state that we tried to use was a checkpoint created with this conf.



> Are you using the local recovery feature?


Yes, and in this particular case the job was constantly failing/restarting because of Too Many Open Files. So we terminated the cluster entirely, created a new one, and launched a new job by specifying the latest checkpoint path to restore state from.


This is the only time I have seen this error happen with timer state. I still have that bad checkpoint data on s3, so I might be able to try to restore it again if needed to debug it. But that would require some tweaking, because I don't want to tangle with the same kafka consumer group offsets or send old data again to production endpoint.


Please keep in mind that there was that Too Many Open Files issue on the cluster that created the problematic checkpoint, if you think that's relevant.


On Tue, May 15, 2018 at 10:39 AM, Stefan Richter <s....@data-artisans.com> wrote:

Hi,


I agree, this looks like a bug. Can you tell us your exact configuration of the state backend, e.g. if you are using incremental checkpoints or not. Are you using the local recovery feature? Are you restarting the job from a checkpoint or a savepoint? Can you provide logs for both the job that failed and the restarted job?


Best,
Stefan




Am 14.05.2018 um 13:00 schrieb Juho Autio <ju...@rovio.com>:


We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old state. After restoring state from a checkpoint, it seems like a timer had been restored, but not the data that was expected to be in a related MapState if such timer has been added.



The way I see this is that there's a bug, either of these:
- The writing of timers & map states to Flink state is not synchronized (or maybe there are no such guarantees by design?)
- Flink may restore a checkpoint that is actually corrupted/incomplete


Our code (simplified):


    private MapState<String, String> mapState;


    public void processElement(..) {
            mapState.put("lastUpdated", ctx.timestamp().toString());
            ctx.timerService().registerEventTimeTimer(ctx.timestamp() + stateRetentionMillis);
    }


    public void onTimer(long timestamp, OnTimerContext ctx, ..) {

        long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
        if (timestamp >= lastUpdated + stateRetentionMillis) {
            mapState.clear();
        }
    }


Normally this "just works". As you can see, it shouldn't be possible that "lastUpdated" doesn't exist in state if timer was registered and onTimer gets called.


However, after restoring state from a checkpoint, the job kept failing with this error:


Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
..


So apparently onTimer was called but lastUpdated wasn't found in the MapState.


The background for restoring state in this case is not entirely clean. There was an OS level issue "Too many open files" after running a job for ~11 days. To fix that, we replaced the cluster with a new one and launched the Flink job again. State was successfully restored from the latest checkpoint that had been created by the "problematic execution". Now, I'm assuming that if the state wouldn't have been created successfully, restoring wouldn't succeed either – correct? This is just to rule out that the issue with state didn't happen because the checkpoint files were somehow corrupted due to the Too many open files problem.



Thank you all for your continued support!


P.S. I would be very much interested to hear if there's some cleaner way to achieve this kind of TTL for keyed state in Flink.






















Re: Missing MapState when Timer fires after restored state

Posted by Juho Autio <ju...@rovio.com>.
Yes, I'm rescaling from a checkpoint.

> that behavior is not guaranteed yet

If restoring + rescaling a checkpoint is not supported properly, I don't
understand why Flink doesn't entirely refuse to restore in that case?

Note that I'm using a rather new 1.5-SNAPSHOT, not release 1.4. To be
exact, the package was built at flink commit 8395508b0401353ed07375e22882e7
581d46ac0e.

I also made this additional test:
- launch again from the checkpoint with the original parallelism=8
- cancel with savepoint (this was after ~7 minutes, during which the job
also created some new incremental checkpoints
- restore the newly created savepoint with parallelism=16
- it works

Now I have restored from the original checkpoint multiple times and it
consistently fails with that java.lang.NumberFormatException when trying
with parallelism=18.

I was able to enable trace-level logs with this change (nothing changed in
logback.xml):

Added these lines to flink-1.5-SNAPSHOT/conf/log4j.properties:

# To debug checkpoint restoring
log4j.logger.org.apache.flink.contrib.streaming.state=TRACE
log4j.logger.org.apache.flink.runtime.state=TRACE
log4j.logger.org.apache.flink.runtime.checkpoint=TRACE
log4j.logger.org.apache.flink.streaming.api.operators=TRACE
log4j.logger.org.apache.flink.streaming.runtime.tasks=TRACE

(This is confusing to me – I had understood that Flink would have migrated
entirely to use logback configuration instead of log4j?)

I modified the logs to remove sensitive information, but because I'm not
100% sure that I caught everything, I will share the logs personally with
Stefan only.

On Wed, May 16, 2018 at 5:22 AM, sihua zhou <su...@163.com> wrote:

> Hi Juho,
> if I'm not misunderstand, you saied your're rescaling the job from the
> checkpoint? If yes, I think that behavior is not guaranteed yet, you can
> find this on the doc https://ci.apache.org/proj
> ects/flink/flink-docs-release-1.4/ops/state/checkpoints.html
> #difference-to-savepoints. So, I not sure whether this is a "bug" at
> current stage(personally I'd like to dig it out because currently we also
> use the checkpoint like the way you are) ...
>
> Best, Sihua
>
> On 05/16/2018 01:46,Juho Autio<ju...@rovio.com>
> <ju...@rovio.com> wrote:
>
> I was able to reproduce this error.
>
> I just happened to notice an important detail about the original failure:
> - checkpoint was created with a 1-node cluster (parallelism=8)
> - restored on a 2-node cluster (parallelism=16), caused that null
> exception
>
> I tried restoring again from the problematic checkpoint again
> - restored on a 1-node cluster, no problems
> - restored on a 2-node cluster, getting the original error!
>
> So now I have a way to reproduce the bug. To me it seems like the
> checkpoint itself is fine. The bug seems to be in redistributing the state
> of a restored checkpoint to a higher parallelism. I only tested each
> cluster size once (as described above) so it could also be coincidence, but
> seems at least likely now that it's about the state redistribution.
>
> I'll try to follow up with those TRACE-level logs tomorrow. Today I tried
> adding these to the logback.xml, but I didn't get anything else but INFO
> level logs:
>
>     <logger name="org.apache.flink.contrib.streaming.state" level="TRACE">
>         <appender-ref ref="file"/>
>     </logger>
>     <logger name="org.apache.flink.runtime.state" level="TRACE">
>         <appender-ref ref="file"/>
>     </logger>
>     <logger name="org.apache.flink.runtime.checkpoint" level="TRACE">
>         <appender-ref ref="file"/>
>     </logger>
>     <logger name="org.apache.flink.streaming.api.operators" level="TRACE">
>         <appender-ref ref="file"/>
>     </logger>
>     <logger name="org.apache.flink.streaming.runtime.tasks" level="TRACE">
>         <appender-ref ref="file"/>
>     </logger>
>
> Maybe I need to edit the log4j.properties instead(?). Indeed it's Flink
> 1.5-SNAPSHOT and the package has all of these in the conf/ dir:
>
> log4j-cli.properties
> log4j-console.properties
> log4j.properties
> log4j-yarn-session.properties
> logback-console.xml
> logback.xml
> logback-yarn.xml
>
> On Tue, May 15, 2018 at 11:49 AM, Stefan Richter <
> s.richter@data-artisans.com> wrote:
>
>> Hi,
>>
>> Am 15.05.2018 um 10:34 schrieb Juho Autio <ju...@rovio.com>:
>>
>> Ok, that should be possible to provide. Are there any specific packages
>> to set on trace level? Maybe just go with org.apache.flink.* on TRACE?
>>
>>
>> The following packages would be helpful:
>>
>> org.apache.flink.contrib.streaming.state.*
>> org.apache.flink.runtime.state.*
>> org.apache.flink.runtime.checkpoint.*
>> org.apache.flink.streaming.api.operators.*
>> org.apache.flink.streaming.runtime.tasks.*
>>
>>
>> > did the „too many open files“ problem only happen with local recovery
>> (asking since it should actually not add the the amount of open files)
>>
>> I think it happened in various places, maybe not when restoring.. Any way
>> if the situation is like that, the system is pretty much unusable (on OS
>> level), so it shouldn't matter too much which operation of the application
>> it causes to fail? Any way I'll try to grab & share all log lines that say
>> "Too Many Open Files"..
>>
>> > and did you deactivate it on the second cluster for the restart or
>> changed your OS settings?
>>
>> No, didn't change anything except for increasing the ulimit on OS to
>> prevent this from happening again. Note that the system only ran out of
>> files after ~11 days of uptime. During that time there had been some local
>> recoveries. This makes me wonder though, could it be that many local
>> recoveries eventually caused this – could it be that in the occasion of
>> local recovery some "old" files are left open, making the system eventually
>> run out of files?
>>
>>
>>
>> From the way how local recovery works with incremental RocksDB
>> checkpoints, I would not assume that it is the cause of the problem. In
>> this particular case, the number of opened files on a local FS should not
>> be higher than the number without local recovery. Maybe it is just a matter
>> of the OS limit and the number of operators with a RocksDB backend running
>> on the machine and the amount of files managed by all those RocksDB
>> instances that simply exceed the limit. If you have an overview how many
>> parallel operator instances with keyed state were running on the machine
>> and assume some reasonable number of files per RocksDB instance and the
>> limit configured in your OS, could that be the case?
>>
>>
>> Thanks!
>>
>>
>> Thanks for your help!
>>
>>
>> On Tue, May 15, 2018 at 11:17 AM, Stefan Richter <
>> s.richter@data-artisans.com> wrote:
>>
>>> Btw having a trace level log of a restart from a problematic checkpoint
>>> could actually be helpful if we cannot find the problem from the previous
>>> points. This can give a more detailed view of what checkpoint files are
>>> mapped to which operator.
>>>
>>> I am having one more question: did the „too many open files“ problem
>>> only happen with local recovery (asking since it should actually not add
>>> the the amount of open files), and did you deactivate it on the second
>>> cluster for the restart or changed your OS settings?
>>>
>>>
>>> Am 15.05.2018 um 10:09 schrieb Stefan Richter <
>>> s.richter@data-artisans.com>:
>>>
>>> What I would like to see from the logs is (also depending a bit on your
>>> log level):
>>>
>>> - all exceptions.
>>> - in which context exactly the „too many open files“ problem occurred,
>>> because I think for checkpoint consistency it should not matter as a
>>> checkpoint with such a problem should never succeed.
>>> - files that are written for checkpoints/savepoints.
>>> - completed checkpoints/savepoints ids.
>>> - the restored checkpoint/savepoint id.
>>> - files that are loaded on restore.
>>>
>>> Am 15.05.2018 um 10:02 schrieb Juho Autio <ju...@rovio.com>:
>>>
>>> Thanks all. I'll have to see about sharing the logs & configuration..
>>>
>>> Is there something special that you'd like to see from the logs? It may
>>> be easier for me to get specific lines and obfuscate sensitive information
>>> instead of trying to do that for the full logs.
>>>
>>> We basically have: RocksDBStateBackend with enableIncrementalCheckpointing=true,
>>> external state path on s3.
>>>
>>> The code that we use is:
>>>
>>>             env.setStateBackend(getStateBackend(statePath, new
>>> RocksDBStateBackend(statePath, true)));
>>>             env.getCheckpointConfig().setMinPauseBetweenCheckpoints(para
>>> ms.getLong("checkpoint.minPause", 60 * 1000));
>>>             env.getCheckpointConfig().setMaxConcurrentCheckpoints(params
>>> .getInt("checkpoint.maxConcurrent", 1));
>>>             env.getCheckpointConfig().setCheckpointTimeout(params.getLong("checkpoint.timeout",
>>> 10 * 60 * 1000));
>>>             env.getCheckpointConfig().enableExternalizedCheckpoints(Chec
>>> kpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>>
>>> The problematic state that we tried to use was a checkpoint created with
>>> this conf.
>>>
>>> > Are you using the local recovery feature?
>>>
>>> Yes, and in this particular case the job was constantly
>>> failing/restarting because of Too Many Open Files. So we terminated the
>>> cluster entirely, created a new one, and launched a new job by specifying
>>> the latest checkpoint path to restore state from.
>>>
>>> This is the only time I have seen this error happen with timer state. I
>>> still have that bad checkpoint data on s3, so I might be able to try to
>>> restore it again if needed to debug it. But that would require some
>>> tweaking, because I don't want to tangle with the same kafka consumer group
>>> offsets or send old data again to production endpoint.
>>>
>>> Please keep in mind that there was that Too Many Open Files issue on the
>>> cluster that created the problematic checkpoint, if you think that's
>>> relevant.
>>>
>>> On Tue, May 15, 2018 at 10:39 AM, Stefan Richter <
>>> s.richter@data-artisans.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I agree, this looks like a bug. Can you tell us your exact
>>>> configuration of the state backend, e.g. if you are using incremental
>>>> checkpoints or not. Are you using the local recovery feature? Are you
>>>> restarting the job from a checkpoint or a savepoint? Can you provide logs
>>>> for both the job that failed and the restarted job?
>>>>
>>>> Best,
>>>> Stefan
>>>>
>>>>
>>>> Am 14.05.2018 um 13:00 schrieb Juho Autio <ju...@rovio.com>:
>>>>
>>>> We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear
>>>> old state. After restoring state from a checkpoint, it seems like a timer
>>>> had been restored, but not the data that was expected to be in a related
>>>> MapState if such timer has been added.
>>>>
>>>> The way I see this is that there's a bug, either of these:
>>>> - The writing of timers & map states to Flink state is not synchronized
>>>> (or maybe there are no such guarantees by design?)
>>>> - Flink may restore a checkpoint that is actually corrupted/incomplete
>>>>
>>>> Our code (simplified):
>>>>
>>>>     private MapState<String, String> mapState;
>>>>
>>>>     public void processElement(..) {
>>>>             mapState.put("lastUpdated", ctx.timestamp().toString());
>>>>             ctx.timerService().registerEventTimeTimer(ctx.timestamp()
>>>> + stateRetentionMillis);
>>>>     }
>>>>
>>>>     public void onTimer(long timestamp, OnTimerContext ctx, ..) {
>>>>         long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
>>>>         if (timestamp >= lastUpdated + stateRetentionMillis) {
>>>>             mapState.clear();
>>>>         }
>>>>     }
>>>>
>>>> Normally this "just works". As you can see, it shouldn't be possible
>>>> that "lastUpdated" doesn't exist in state if timer was registered and
>>>> onTimer gets called.
>>>>
>>>> However, after restoring state from a checkpoint, the job kept failing
>>>> with this error:
>>>>
>>>> Caused by: java.lang.NumberFormatException: null
>>>> at java.lang.Long.parseLong(Long.java:552)
>>>> at java.lang.Long.parseLong(Long.java:631)
>>>> at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunct
>>>> ion.java:136)
>>>> ..
>>>>
>>>> So apparently onTimer was called but lastUpdated wasn't found in the
>>>> MapState.
>>>>
>>>> The background for restoring state in this case is not entirely clean.
>>>> There was an OS level issue "Too many open files" after running a job for
>>>> ~11 days. To fix that, we replaced the cluster with a new one and launched
>>>> the Flink job again. State was successfully restored from the latest
>>>> checkpoint that had been created by the "problematic execution". Now, I'm
>>>> assuming that if the state wouldn't have been created successfully,
>>>> restoring wouldn't succeed either – correct? This is just to rule out that
>>>> the issue with state didn't happen because the checkpoint files were
>>>> somehow corrupted due to the Too many open files problem.
>>>>
>>>> Thank you all for your continued support!
>>>>
>>>> P.S. I would be very much interested to hear if there's some cleaner
>>>> way to achieve this kind of TTL for keyed state in Flink.
>>>>
>>>>
>>>>
>>>
>>>
>>>
>>>
>>
>>
>>
>
>

Re: Missing MapState when Timer fires after restored state

Posted by sihua zhou <su...@163.com>.
Hi Juho,
if I'm not misunderstand, you saied your're rescaling the job from the checkpoint? If yes, I think that behavior is not guaranteed yet, you can find this on the doc https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/checkpoints.html#difference-to-savepoints. So, I not sure whether this is a "bug" at current stage(personally I'd like to dig it out because currently we also use the checkpoint like the way you are) ...


Best, Sihua


On 05/16/2018 01:46,Juho Autio<ju...@rovio.com> wrote:
I was able to reproduce this error.


I just happened to notice an important detail about the original failure:

- checkpoint was created with a 1-node cluster (parallelism=8)
- restored on a 2-node cluster (parallelism=16), caused that null exception



I tried restoring again from the problematic checkpoint again

- restored on a 1-node cluster, no problems
- restored on a 2-node cluster, getting the original error!


So now I have a way to reproduce the bug. To me it seems like the checkpoint itself is fine. The bug seems to be in redistributing the state of a restored checkpoint to a higher parallelism. I only tested each cluster size once (as described above) so it could also be coincidence, but seems at least likely now that it's about the state redistribution.


I'll try to follow up with those TRACE-level logs tomorrow. Today I tried adding these to the logback.xml, but I didn't get anything else but INFO level logs:



    <logger name="org.apache.flink.contrib.streaming.state" level="TRACE">
        <appender-ref ref="file"/>
    </logger>
    <logger name="org.apache.flink.runtime.state" level="TRACE">
        <appender-ref ref="file"/>
    </logger>
    <logger name="org.apache.flink.runtime.checkpoint" level="TRACE">
        <appender-ref ref="file"/>
    </logger>
    <logger name="org.apache.flink.streaming.api.operators" level="TRACE">
        <appender-ref ref="file"/>
    </logger>
    <logger name="org.apache.flink.streaming.runtime.tasks" level="TRACE">
        <appender-ref ref="file"/>
    </logger>


Maybe I need to edit the log4j.properties instead(?). Indeed it's Flink 1.5-SNAPSHOT and the package has all of these in the conf/ dir:


log4j-cli.properties
log4j-console.properties
log4j.properties
log4j-yarn-session.properties
logback-console.xml
logback.xml
logback-yarn.xml


On Tue, May 15, 2018 at 11:49 AM, Stefan Richter <s....@data-artisans.com> wrote:

Hi,



Am 15.05.2018 um 10:34 schrieb Juho Autio <ju...@rovio.com>:


Ok, that should be possible to provide. Are there any specific packages to set on trace level? Maybe just go with org.apache.flink.* on TRACE?


The following packages would be helpful:


org.apache.flink.contrib.streaming.state.*
org.apache.flink.runtime.state.*
org.apache.flink.runtime.checkpoint.*
org.apache.flink.streaming.api.operators.*
org.apache.flink.streaming.runtime.tasks.*




> did the „too many open files“ problem only happen with local recovery (asking since it should actually not add the the amount of open files)


I think it happened in various places, maybe not when restoring.. Any way if the situation is like that, the system is pretty much unusable (on OS level), so it shouldn't matter too much which operation of the application it causes to fail? Any way I'll try to grab & share all log lines that say "Too Many Open Files"..


> and did you deactivate it on the second cluster for the restart or changed your OS settings?



No, didn't change anything except for increasing the ulimit on OS to prevent this from happening again. Note that the system only ran out of files after ~11 days of uptime. During that time there had been some local recoveries. This makes me wonder though, could it be that many local recoveries eventually caused this – could it be that in the occasion of local recovery some "old" files are left open, making the system eventually run out of files?




From the way how local recovery works with incremental RocksDB checkpoints, I would not assume that it is the cause of the problem. In this particular case, the number of opened files on a local FS should not be higher than the number without local recovery. Maybe it is just a matter of the OS limit and the number of operators with a RocksDB backend running on the machine and the amount of files managed by all those RocksDB instances that simply exceed the limit. If you have an overview how many parallel operator instances with keyed state were running on the machine and assume some reasonable number of files per RocksDB instance and the limit configured in your OS, could that be the case?




Thanks!


Thanks for your help!




On Tue, May 15, 2018 at 11:17 AM, Stefan Richter <s....@data-artisans.com> wrote:

Btw having a trace level log of a restart from a problematic checkpoint could actually be helpful if we cannot find the problem from the previous points. This can give a more detailed view of what checkpoint files are mapped to which operator.


I am having one more question: did the „too many open files“ problem only happen with local recovery (asking since it should actually not add the the amount of open files), and did you deactivate it on the second cluster for the restart or changed your OS settings?




Am 15.05.2018 um 10:09 schrieb Stefan Richter <s....@data-artisans.com>:


What I would like to see from the logs is (also depending a bit on your log level):


- all exceptions.
- in which context exactly the „too many open files“ problem occurred, because I think for checkpoint consistency it should not matter as a checkpoint with such a problem should never succeed.
- files that are written for checkpoints/savepoints.
- completed checkpoints/savepoints ids.
- the restored checkpoint/savepoint id.
- files that are loaded on restore.


Am 15.05.2018 um 10:02 schrieb Juho Autio <ju...@rovio.com>:


Thanks all. I'll have to see about sharing the logs & configuration..


Is there something special that you'd like to see from the logs? It may be easier for me to get specific lines and obfuscate sensitive information instead of trying to do that for the full logs.



We basically have: RocksDBStateBackend with enableIncrementalCheckpointing=true, external state path on s3.



The code that we use is:


            env.setStateBackend(getStateBackend(statePath, new RocksDBStateBackend(statePath, true)));
            env.getCheckpointConfig().setMinPauseBetweenCheckpoints(params.getLong("checkpoint.minPause", 60 * 1000));
            env.getCheckpointConfig().setMaxConcurrentCheckpoints(params.getInt("checkpoint.maxConcurrent", 1));
            env.getCheckpointConfig().setCheckpointTimeout(params.getLong("checkpoint.timeout", 10 * 60 * 1000));
            env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


The problematic state that we tried to use was a checkpoint created with this conf.



> Are you using the local recovery feature?


Yes, and in this particular case the job was constantly failing/restarting because of Too Many Open Files. So we terminated the cluster entirely, created a new one, and launched a new job by specifying the latest checkpoint path to restore state from.


This is the only time I have seen this error happen with timer state. I still have that bad checkpoint data on s3, so I might be able to try to restore it again if needed to debug it. But that would require some tweaking, because I don't want to tangle with the same kafka consumer group offsets or send old data again to production endpoint.


Please keep in mind that there was that Too Many Open Files issue on the cluster that created the problematic checkpoint, if you think that's relevant.


On Tue, May 15, 2018 at 10:39 AM, Stefan Richter <s....@data-artisans.com> wrote:

Hi,


I agree, this looks like a bug. Can you tell us your exact configuration of the state backend, e.g. if you are using incremental checkpoints or not. Are you using the local recovery feature? Are you restarting the job from a checkpoint or a savepoint? Can you provide logs for both the job that failed and the restarted job?


Best,
Stefan




Am 14.05.2018 um 13:00 schrieb Juho Autio <ju...@rovio.com>:


We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old state. After restoring state from a checkpoint, it seems like a timer had been restored, but not the data that was expected to be in a related MapState if such timer has been added.



The way I see this is that there's a bug, either of these:
- The writing of timers & map states to Flink state is not synchronized (or maybe there are no such guarantees by design?)
- Flink may restore a checkpoint that is actually corrupted/incomplete


Our code (simplified):


    private MapState<String, String> mapState;


    public void processElement(..) {
            mapState.put("lastUpdated", ctx.timestamp().toString());
            ctx.timerService().registerEventTimeTimer(ctx.timestamp() + stateRetentionMillis);
    }


    public void onTimer(long timestamp, OnTimerContext ctx, ..) {

        long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
        if (timestamp >= lastUpdated + stateRetentionMillis) {
            mapState.clear();
        }
    }


Normally this "just works". As you can see, it shouldn't be possible that "lastUpdated" doesn't exist in state if timer was registered and onTimer gets called.


However, after restoring state from a checkpoint, the job kept failing with this error:


Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
..


So apparently onTimer was called but lastUpdated wasn't found in the MapState.


The background for restoring state in this case is not entirely clean. There was an OS level issue "Too many open files" after running a job for ~11 days. To fix that, we replaced the cluster with a new one and launched the Flink job again. State was successfully restored from the latest checkpoint that had been created by the "problematic execution". Now, I'm assuming that if the state wouldn't have been created successfully, restoring wouldn't succeed either – correct? This is just to rule out that the issue with state didn't happen because the checkpoint files were somehow corrupted due to the Too many open files problem.



Thank you all for your continued support!


P.S. I would be very much interested to hear if there's some cleaner way to achieve this kind of TTL for keyed state in Flink.




















Re: Missing MapState when Timer fires after restored state

Posted by Juho Autio <ju...@rovio.com>.
I was able to reproduce this error.

I just happened to notice an important detail about the original failure:
- checkpoint was created with a 1-node cluster (parallelism=8)
- restored on a 2-node cluster (parallelism=16), caused that null exception

I tried restoring again from the problematic checkpoint again
- restored on a 1-node cluster, no problems
- restored on a 2-node cluster, getting the original error!

So now I have a way to reproduce the bug. To me it seems like the
checkpoint itself is fine. The bug seems to be in redistributing the state
of a restored checkpoint to a higher parallelism. I only tested each
cluster size once (as described above) so it could also be coincidence, but
seems at least likely now that it's about the state redistribution.

I'll try to follow up with those TRACE-level logs tomorrow. Today I tried
adding these to the logback.xml, but I didn't get anything else but INFO
level logs:

    <logger name="org.apache.flink.contrib.streaming.state" level="TRACE">
        <appender-ref ref="file"/>
    </logger>
    <logger name="org.apache.flink.runtime.state" level="TRACE">
        <appender-ref ref="file"/>
    </logger>
    <logger name="org.apache.flink.runtime.checkpoint" level="TRACE">
        <appender-ref ref="file"/>
    </logger>
    <logger name="org.apache.flink.streaming.api.operators" level="TRACE">
        <appender-ref ref="file"/>
    </logger>
    <logger name="org.apache.flink.streaming.runtime.tasks" level="TRACE">
        <appender-ref ref="file"/>
    </logger>

Maybe I need to edit the log4j.properties instead(?). Indeed it's Flink
1.5-SNAPSHOT and the package has all of these in the conf/ dir:

log4j-cli.properties
log4j-console.properties
log4j.properties
log4j-yarn-session.properties
logback-console.xml
logback.xml
logback-yarn.xml

On Tue, May 15, 2018 at 11:49 AM, Stefan Richter <
s.richter@data-artisans.com> wrote:

> Hi,
>
> Am 15.05.2018 um 10:34 schrieb Juho Autio <ju...@rovio.com>:
>
> Ok, that should be possible to provide. Are there any specific packages to
> set on trace level? Maybe just go with org.apache.flink.* on TRACE?
>
>
> The following packages would be helpful:
>
> org.apache.flink.contrib.streaming.state.*
> org.apache.flink.runtime.state.*
> org.apache.flink.runtime.checkpoint.*
> org.apache.flink.streaming.api.operators.*
> org.apache.flink.streaming.runtime.tasks.*
>
>
> > did the „too many open files“ problem only happen with local recovery
> (asking since it should actually not add the the amount of open files)
>
> I think it happened in various places, maybe not when restoring.. Any way
> if the situation is like that, the system is pretty much unusable (on OS
> level), so it shouldn't matter too much which operation of the application
> it causes to fail? Any way I'll try to grab & share all log lines that say
> "Too Many Open Files"..
>
> > and did you deactivate it on the second cluster for the restart or
> changed your OS settings?
>
> No, didn't change anything except for increasing the ulimit on OS to
> prevent this from happening again. Note that the system only ran out of
> files after ~11 days of uptime. During that time there had been some local
> recoveries. This makes me wonder though, could it be that many local
> recoveries eventually caused this – could it be that in the occasion of
> local recovery some "old" files are left open, making the system eventually
> run out of files?
>
>
>
> From the way how local recovery works with incremental RocksDB
> checkpoints, I would not assume that it is the cause of the problem. In
> this particular case, the number of opened files on a local FS should not
> be higher than the number without local recovery. Maybe it is just a matter
> of the OS limit and the number of operators with a RocksDB backend running
> on the machine and the amount of files managed by all those RocksDB
> instances that simply exceed the limit. If you have an overview how many
> parallel operator instances with keyed state were running on the machine
> and assume some reasonable number of files per RocksDB instance and the
> limit configured in your OS, could that be the case?
>
>
> Thanks!
>
>
> Thanks for your help!
>
>
> On Tue, May 15, 2018 at 11:17 AM, Stefan Richter <
> s.richter@data-artisans.com> wrote:
>
>> Btw having a trace level log of a restart from a problematic checkpoint
>> could actually be helpful if we cannot find the problem from the previous
>> points. This can give a more detailed view of what checkpoint files are
>> mapped to which operator.
>>
>> I am having one more question: did the „too many open files“ problem only
>> happen with local recovery (asking since it should actually not add the the
>> amount of open files), and did you deactivate it on the second cluster for
>> the restart or changed your OS settings?
>>
>>
>> Am 15.05.2018 um 10:09 schrieb Stefan Richter <
>> s.richter@data-artisans.com>:
>>
>> What I would like to see from the logs is (also depending a bit on your
>> log level):
>>
>> - all exceptions.
>> - in which context exactly the „too many open files“ problem occurred,
>> because I think for checkpoint consistency it should not matter as a
>> checkpoint with such a problem should never succeed.
>> - files that are written for checkpoints/savepoints.
>> - completed checkpoints/savepoints ids.
>> - the restored checkpoint/savepoint id.
>> - files that are loaded on restore.
>>
>> Am 15.05.2018 um 10:02 schrieb Juho Autio <ju...@rovio.com>:
>>
>> Thanks all. I'll have to see about sharing the logs & configuration..
>>
>> Is there something special that you'd like to see from the logs? It may
>> be easier for me to get specific lines and obfuscate sensitive information
>> instead of trying to do that for the full logs.
>>
>> We basically have: RocksDBStateBackend with enableIncrementalCheckpointing=true,
>> external state path on s3.
>>
>> The code that we use is:
>>
>>             env.setStateBackend(getStateBackend(statePath, new
>> RocksDBStateBackend(statePath, true)));
>>             env.getCheckpointConfig().setMinPauseBetweenCheckpoints(para
>> ms.getLong("checkpoint.minPause", 60 * 1000));
>>             env.getCheckpointConfig().setMaxConcurrentCheckpoints(params
>> .getInt("checkpoint.maxConcurrent", 1));
>>             env.getCheckpointConfig().setCheckpointTimeout(params.getLong("checkpoint.timeout",
>> 10 * 60 * 1000));
>>             env.getCheckpointConfig().enableExternalizedCheckpoints(Chec
>> kpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>
>> The problematic state that we tried to use was a checkpoint created with
>> this conf.
>>
>> > Are you using the local recovery feature?
>>
>> Yes, and in this particular case the job was constantly
>> failing/restarting because of Too Many Open Files. So we terminated the
>> cluster entirely, created a new one, and launched a new job by specifying
>> the latest checkpoint path to restore state from.
>>
>> This is the only time I have seen this error happen with timer state. I
>> still have that bad checkpoint data on s3, so I might be able to try to
>> restore it again if needed to debug it. But that would require some
>> tweaking, because I don't want to tangle with the same kafka consumer group
>> offsets or send old data again to production endpoint.
>>
>> Please keep in mind that there was that Too Many Open Files issue on the
>> cluster that created the problematic checkpoint, if you think that's
>> relevant.
>>
>> On Tue, May 15, 2018 at 10:39 AM, Stefan Richter <
>> s.richter@data-artisans.com> wrote:
>>
>>> Hi,
>>>
>>> I agree, this looks like a bug. Can you tell us your exact configuration
>>> of the state backend, e.g. if you are using incremental checkpoints or not.
>>> Are you using the local recovery feature? Are you restarting the job from a
>>> checkpoint or a savepoint? Can you provide logs for both the job that
>>> failed and the restarted job?
>>>
>>> Best,
>>> Stefan
>>>
>>>
>>> Am 14.05.2018 um 13:00 schrieb Juho Autio <ju...@rovio.com>:
>>>
>>> We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear
>>> old state. After restoring state from a checkpoint, it seems like a timer
>>> had been restored, but not the data that was expected to be in a related
>>> MapState if such timer has been added.
>>>
>>> The way I see this is that there's a bug, either of these:
>>> - The writing of timers & map states to Flink state is not synchronized
>>> (or maybe there are no such guarantees by design?)
>>> - Flink may restore a checkpoint that is actually corrupted/incomplete
>>>
>>> Our code (simplified):
>>>
>>>     private MapState<String, String> mapState;
>>>
>>>     public void processElement(..) {
>>>             mapState.put("lastUpdated", ctx.timestamp().toString());
>>>             ctx.timerService().registerEventTimeTimer(ctx.timestamp() +
>>> stateRetentionMillis);
>>>     }
>>>
>>>     public void onTimer(long timestamp, OnTimerContext ctx, ..) {
>>>         long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
>>>         if (timestamp >= lastUpdated + stateRetentionMillis) {
>>>             mapState.clear();
>>>         }
>>>     }
>>>
>>> Normally this "just works". As you can see, it shouldn't be possible
>>> that "lastUpdated" doesn't exist in state if timer was registered and
>>> onTimer gets called.
>>>
>>> However, after restoring state from a checkpoint, the job kept failing
>>> with this error:
>>>
>>> Caused by: java.lang.NumberFormatException: null
>>> at java.lang.Long.parseLong(Long.java:552)
>>> at java.lang.Long.parseLong(Long.java:631)
>>> at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunct
>>> ion.java:136)
>>> ..
>>>
>>> So apparently onTimer was called but lastUpdated wasn't found in the
>>> MapState.
>>>
>>> The background for restoring state in this case is not entirely clean.
>>> There was an OS level issue "Too many open files" after running a job for
>>> ~11 days. To fix that, we replaced the cluster with a new one and launched
>>> the Flink job again. State was successfully restored from the latest
>>> checkpoint that had been created by the "problematic execution". Now, I'm
>>> assuming that if the state wouldn't have been created successfully,
>>> restoring wouldn't succeed either – correct? This is just to rule out that
>>> the issue with state didn't happen because the checkpoint files were
>>> somehow corrupted due to the Too many open files problem.
>>>
>>> Thank you all for your continued support!
>>>
>>> P.S. I would be very much interested to hear if there's some cleaner way
>>> to achieve this kind of TTL for keyed state in Flink.
>>>
>>>
>>>
>>
>>
>>
>>
>
>
>

Re: Missing MapState when Timer fires after restored state

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

> Am 15.05.2018 um 10:34 schrieb Juho Autio <ju...@rovio.com>:
> 
> Ok, that should be possible to provide. Are there any specific packages to set on trace level? Maybe just go with org.apache.flink.* on TRACE?

The following packages would be helpful:

org.apache.flink.contrib.streaming.state.*
org.apache.flink.runtime.state.*
org.apache.flink.runtime.checkpoint.*
org.apache.flink.streaming.api.operators.*
org.apache.flink.streaming.runtime.tasks.*

> 
> > did the „too many open files“ problem only happen with local recovery (asking since it should actually not add the the amount of open files)
> 
> I think it happened in various places, maybe not when restoring.. Any way if the situation is like that, the system is pretty much unusable (on OS level), so it shouldn't matter too much which operation of the application it causes to fail? Any way I'll try to grab & share all log lines that say "Too Many Open Files"..
> 
> > and did you deactivate it on the second cluster for the restart or changed your OS settings?
> 
> No, didn't change anything except for increasing the ulimit on OS to prevent this from happening again. Note that the system only ran out of files after ~11 days of uptime. During that time there had been some local recoveries. This makes me wonder though, could it be that many local recoveries eventually caused this – could it be that in the occasion of local recovery some "old" files are left open, making the system eventually run out of files?


From the way how local recovery works with incremental RocksDB checkpoints, I would not assume that it is the cause of the problem. In this particular case, the number of opened files on a local FS should not be higher than the number without local recovery. Maybe it is just a matter of the OS limit and the number of operators with a RocksDB backend running on the machine and the amount of files managed by all those RocksDB instances that simply exceed the limit. If you have an overview how many parallel operator instances with keyed state were running on the machine and assume some reasonable number of files per RocksDB instance and the limit configured in your OS, could that be the case?

> 
> Thanks!

Thanks for your help!

> 
> On Tue, May 15, 2018 at 11:17 AM, Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>> wrote:
> Btw having a trace level log of a restart from a problematic checkpoint could actually be helpful if we cannot find the problem from the previous points. This can give a more detailed view of what checkpoint files are mapped to which operator.
> 
> I am having one more question: did the „too many open files“ problem only happen with local recovery (asking since it should actually not add the the amount of open files), and did you deactivate it on the second cluster for the restart or changed your OS settings?
> 
> 
>> Am 15.05.2018 um 10:09 schrieb Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>>:
>> 
>> What I would like to see from the logs is (also depending a bit on your log level):
>> 
>> - all exceptions.
>> - in which context exactly the „too many open files“ problem occurred, because I think for checkpoint consistency it should not matter as a checkpoint with such a problem should never succeed.
>> - files that are written for checkpoints/savepoints.
>> - completed checkpoints/savepoints ids.
>> - the restored checkpoint/savepoint id.
>> - files that are loaded on restore.
>> 
>>> Am 15.05.2018 um 10:02 schrieb Juho Autio <juho.autio@rovio.com <ma...@rovio.com>>:
>>> 
>>> Thanks all. I'll have to see about sharing the logs & configuration..
>>> 
>>> Is there something special that you'd like to see from the logs? It may be easier for me to get specific lines and obfuscate sensitive information instead of trying to do that for the full logs.
>>> 
>>> We basically have: RocksDBStateBackend with enableIncrementalCheckpointing=true, external state path on s3.
>>> 
>>> The code that we use is:
>>> 
>>>             env.setStateBackend(getStateBackend(statePath, new RocksDBStateBackend(statePath, true)));
>>>             env.getCheckpointConfig().setMinPauseBetweenCheckpoints(params.getLong("checkpoint.minPause", 60 * 1000));
>>>             env.getCheckpointConfig().setMaxConcurrentCheckpoints(params.getInt("checkpoint.maxConcurrent", 1));
>>>             env.getCheckpointConfig().setCheckpointTimeout(params.getLong("checkpoint.timeout", 10 * 60 * 1000));
>>>             env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>> 
>>> The problematic state that we tried to use was a checkpoint created with this conf.
>>> 
>>> > Are you using the local recovery feature?
>>> 
>>> Yes, and in this particular case the job was constantly failing/restarting because of Too Many Open Files. So we terminated the cluster entirely, created a new one, and launched a new job by specifying the latest checkpoint path to restore state from.
>>> 
>>> This is the only time I have seen this error happen with timer state. I still have that bad checkpoint data on s3, so I might be able to try to restore it again if needed to debug it. But that would require some tweaking, because I don't want to tangle with the same kafka consumer group offsets or send old data again to production endpoint.
>>> 
>>> Please keep in mind that there was that Too Many Open Files issue on the cluster that created the problematic checkpoint, if you think that's relevant.
>>> 
>>> On Tue, May 15, 2018 at 10:39 AM, Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>> wrote:
>>> Hi,
>>> 
>>> I agree, this looks like a bug. Can you tell us your exact configuration of the state backend, e.g. if you are using incremental checkpoints or not. Are you using the local recovery feature? Are you restarting the job from a checkpoint or a savepoint? Can you provide logs for both the job that failed and the restarted job?
>>> 
>>> Best,
>>> Stefan
>>> 
>>> 
>>>> Am 14.05.2018 um 13:00 schrieb Juho Autio <juho.autio@rovio.com <ma...@rovio.com>>:
>>>> 
>>>> We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old state. After restoring state from a checkpoint, it seems like a timer had been restored, but not the data that was expected to be in a related MapState if such timer has been added.
>>>> 
>>>> The way I see this is that there's a bug, either of these:
>>>> - The writing of timers & map states to Flink state is not synchronized (or maybe there are no such guarantees by design?)
>>>> - Flink may restore a checkpoint that is actually corrupted/incomplete
>>>> 
>>>> Our code (simplified):
>>>> 
>>>>     private MapState<String, String> mapState;
>>>> 
>>>>     public void processElement(..) {
>>>>             mapState.put("lastUpdated", ctx.timestamp().toString());
>>>>             ctx.timerService().registerEventTimeTimer(ctx.timestamp() + stateRetentionMillis);
>>>>     }
>>>> 
>>>>     public void onTimer(long timestamp, OnTimerContext ctx, ..) {
>>>>         long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
>>>>         if (timestamp >= lastUpdated + stateRetentionMillis) {
>>>>             mapState.clear();
>>>>         }
>>>>     }
>>>> 
>>>> Normally this "just works". As you can see, it shouldn't be possible that "lastUpdated" doesn't exist in state if timer was registered and onTimer gets called.
>>>> 
>>>> However, after restoring state from a checkpoint, the job kept failing with this error:
>>>> 
>>>> Caused by: java.lang.NumberFormatException: null
>>>> at java.lang.Long.parseLong(Long.java:552)
>>>> at java.lang.Long.parseLong(Long.java:631)
>>>> at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
>>>> ..
>>>> 
>>>> So apparently onTimer was called but lastUpdated wasn't found in the MapState.
>>>> 
>>>> The background for restoring state in this case is not entirely clean. There was an OS level issue "Too many open files" after running a job for ~11 days. To fix that, we replaced the cluster with a new one and launched the Flink job again. State was successfully restored from the latest checkpoint that had been created by the "problematic execution". Now, I'm assuming that if the state wouldn't have been created successfully, restoring wouldn't succeed either – correct? This is just to rule out that the issue with state didn't happen because the checkpoint files were somehow corrupted due to the Too many open files problem.
>>>> 
>>>> Thank you all for your continued support!
>>>> 
>>>> P.S. I would be very much interested to hear if there's some cleaner way to achieve this kind of TTL for keyed state in Flink.
>>> 
>>> 
>>> 
>> 
> 
> 
> 


Re: Missing MapState when Timer fires after restored state

Posted by Juho Autio <ju...@rovio.com>.
> Btw having a trace level log of a restart from a problematic checkpoint
could actually be helpful

Ok, that should be possible to provide. Are there any specific packages to
set on trace level? Maybe just go with org.apache.flink.* on TRACE?

> did the „too many open files“ problem only happen with local recovery
(asking since it should actually not add the the amount of open files)

I think it happened in various places, maybe not when restoring.. Any way
if the situation is like that, the system is pretty much unusable (on OS
level), so it shouldn't matter too much which operation of the application
it causes to fail? Any way I'll try to grab & share all log lines that say
"Too Many Open Files"..

> and did you deactivate it on the second cluster for the restart or
changed your OS settings?

No, didn't change anything except for increasing the ulimit on OS to
prevent this from happening again. Note that the system only ran out of
files after ~11 days of uptime. During that time there had been some local
recoveries. This makes me wonder though, could it be that many local
recoveries eventually caused this – could it be that in the occasion of
local recovery some "old" files are left open, making the system eventually
run out of files?

Thanks!

On Tue, May 15, 2018 at 11:17 AM, Stefan Richter <
s.richter@data-artisans.com> wrote:

> Btw having a trace level log of a restart from a problematic checkpoint
> could actually be helpful if we cannot find the problem from the previous
> points. This can give a more detailed view of what checkpoint files are
> mapped to which operator.
>
> I am having one more question: did the „too many open files“ problem only
> happen with local recovery (asking since it should actually not add the the
> amount of open files), and did you deactivate it on the second cluster for
> the restart or changed your OS settings?
>
>
> Am 15.05.2018 um 10:09 schrieb Stefan Richter <s.richter@data-artisans.com
> >:
>
> What I would like to see from the logs is (also depending a bit on your
> log level):
>
> - all exceptions.
> - in which context exactly the „too many open files“ problem occurred,
> because I think for checkpoint consistency it should not matter as a
> checkpoint with such a problem should never succeed.
> - files that are written for checkpoints/savepoints.
> - completed checkpoints/savepoints ids.
> - the restored checkpoint/savepoint id.
> - files that are loaded on restore.
>
> Am 15.05.2018 um 10:02 schrieb Juho Autio <ju...@rovio.com>:
>
> Thanks all. I'll have to see about sharing the logs & configuration..
>
> Is there something special that you'd like to see from the logs? It may be
> easier for me to get specific lines and obfuscate sensitive information
> instead of trying to do that for the full logs.
>
> We basically have: RocksDBStateBackend with enableIncrementalCheckpointing=true,
> external state path on s3.
>
> The code that we use is:
>
>             env.setStateBackend(getStateBackend(statePath, new
> RocksDBStateBackend(statePath, true)));
>             env.getCheckpointConfig().setMinPauseBetweenCheckpoints(
> params.getLong("checkpoint.minPause", 60 * 1000));
>             env.getCheckpointConfig().setMaxConcurrentCheckpoints(
> params.getInt("checkpoint.maxConcurrent", 1));
>             env.getCheckpointConfig().setCheckpointTimeout(params.getLong("checkpoint.timeout",
> 10 * 60 * 1000));
>             env.getCheckpointConfig().enableExternalizedCheckpoints(
> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
> The problematic state that we tried to use was a checkpoint created with
> this conf.
>
> > Are you using the local recovery feature?
>
> Yes, and in this particular case the job was constantly failing/restarting
> because of Too Many Open Files. So we terminated the cluster entirely,
> created a new one, and launched a new job by specifying the latest
> checkpoint path to restore state from.
>
> This is the only time I have seen this error happen with timer state. I
> still have that bad checkpoint data on s3, so I might be able to try to
> restore it again if needed to debug it. But that would require some
> tweaking, because I don't want to tangle with the same kafka consumer group
> offsets or send old data again to production endpoint.
>
> Please keep in mind that there was that Too Many Open Files issue on the
> cluster that created the problematic checkpoint, if you think that's
> relevant.
>
> On Tue, May 15, 2018 at 10:39 AM, Stefan Richter <
> s.richter@data-artisans.com> wrote:
>
>> Hi,
>>
>> I agree, this looks like a bug. Can you tell us your exact configuration
>> of the state backend, e.g. if you are using incremental checkpoints or not.
>> Are you using the local recovery feature? Are you restarting the job from a
>> checkpoint or a savepoint? Can you provide logs for both the job that
>> failed and the restarted job?
>>
>> Best,
>> Stefan
>>
>>
>> Am 14.05.2018 um 13:00 schrieb Juho Autio <ju...@rovio.com>:
>>
>> We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear
>> old state. After restoring state from a checkpoint, it seems like a timer
>> had been restored, but not the data that was expected to be in a related
>> MapState if such timer has been added.
>>
>> The way I see this is that there's a bug, either of these:
>> - The writing of timers & map states to Flink state is not synchronized
>> (or maybe there are no such guarantees by design?)
>> - Flink may restore a checkpoint that is actually corrupted/incomplete
>>
>> Our code (simplified):
>>
>>     private MapState<String, String> mapState;
>>
>>     public void processElement(..) {
>>             mapState.put("lastUpdated", ctx.timestamp().toString());
>>             ctx.timerService().registerEventTimeTimer(ctx.timestamp() +
>> stateRetentionMillis);
>>     }
>>
>>     public void onTimer(long timestamp, OnTimerContext ctx, ..) {
>>         long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
>>         if (timestamp >= lastUpdated + stateRetentionMillis) {
>>             mapState.clear();
>>         }
>>     }
>>
>> Normally this "just works". As you can see, it shouldn't be possible that
>> "lastUpdated" doesn't exist in state if timer was registered and onTimer
>> gets called.
>>
>> However, after restoring state from a checkpoint, the job kept failing
>> with this error:
>>
>> Caused by: java.lang.NumberFormatException: null
>> at java.lang.Long.parseLong(Long.java:552)
>> at java.lang.Long.parseLong(Long.java:631)
>> at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunct
>> ion.java:136)
>> ..
>>
>> So apparently onTimer was called but lastUpdated wasn't found in the
>> MapState.
>>
>> The background for restoring state in this case is not entirely clean.
>> There was an OS level issue "Too many open files" after running a job for
>> ~11 days. To fix that, we replaced the cluster with a new one and launched
>> the Flink job again. State was successfully restored from the latest
>> checkpoint that had been created by the "problematic execution". Now, I'm
>> assuming that if the state wouldn't have been created successfully,
>> restoring wouldn't succeed either – correct? This is just to rule out that
>> the issue with state didn't happen because the checkpoint files were
>> somehow corrupted due to the Too many open files problem.
>>
>> Thank you all for your continued support!
>>
>> P.S. I would be very much interested to hear if there's some cleaner way
>> to achieve this kind of TTL for keyed state in Flink.
>>
>>
>>
>
>
>
>

Re: Missing MapState when Timer fires after restored state

Posted by Stefan Richter <s....@data-artisans.com>.
Btw having a trace level log of a restart from a problematic checkpoint could actually be helpful if we cannot find the problem from the previous points. This can give a more detailed view of what checkpoint files are mapped to which operator.

I am having one more question: did the „too many open files“ problem only happen with local recovery (asking since it should actually not add the the amount of open files), and did you deactivate it on the second cluster for the restart or changed your OS settings?

> Am 15.05.2018 um 10:09 schrieb Stefan Richter <s....@data-artisans.com>:
> 
> What I would like to see from the logs is (also depending a bit on your log level):
> 
> - all exceptions.
> - in which context exactly the „too many open files“ problem occurred, because I think for checkpoint consistency it should not matter as a checkpoint with such a problem should never succeed.
> - files that are written for checkpoints/savepoints.
> - completed checkpoints/savepoints ids.
> - the restored checkpoint/savepoint id.
> - files that are loaded on restore.
> 
>> Am 15.05.2018 um 10:02 schrieb Juho Autio <juho.autio@rovio.com <ma...@rovio.com>>:
>> 
>> Thanks all. I'll have to see about sharing the logs & configuration..
>> 
>> Is there something special that you'd like to see from the logs? It may be easier for me to get specific lines and obfuscate sensitive information instead of trying to do that for the full logs.
>> 
>> We basically have: RocksDBStateBackend with enableIncrementalCheckpointing=true, external state path on s3.
>> 
>> The code that we use is:
>> 
>>             env.setStateBackend(getStateBackend(statePath, new RocksDBStateBackend(statePath, true)));
>>             env.getCheckpointConfig().setMinPauseBetweenCheckpoints(params.getLong("checkpoint.minPause", 60 * 1000));
>>             env.getCheckpointConfig().setMaxConcurrentCheckpoints(params.getInt("checkpoint.maxConcurrent", 1));
>>             env.getCheckpointConfig().setCheckpointTimeout(params.getLong("checkpoint.timeout", 10 * 60 * 1000));
>>             env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> 
>> The problematic state that we tried to use was a checkpoint created with this conf.
>> 
>> > Are you using the local recovery feature?
>> 
>> Yes, and in this particular case the job was constantly failing/restarting because of Too Many Open Files. So we terminated the cluster entirely, created a new one, and launched a new job by specifying the latest checkpoint path to restore state from.
>> 
>> This is the only time I have seen this error happen with timer state. I still have that bad checkpoint data on s3, so I might be able to try to restore it again if needed to debug it. But that would require some tweaking, because I don't want to tangle with the same kafka consumer group offsets or send old data again to production endpoint.
>> 
>> Please keep in mind that there was that Too Many Open Files issue on the cluster that created the problematic checkpoint, if you think that's relevant.
>> 
>> On Tue, May 15, 2018 at 10:39 AM, Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>> wrote:
>> Hi,
>> 
>> I agree, this looks like a bug. Can you tell us your exact configuration of the state backend, e.g. if you are using incremental checkpoints or not. Are you using the local recovery feature? Are you restarting the job from a checkpoint or a savepoint? Can you provide logs for both the job that failed and the restarted job?
>> 
>> Best,
>> Stefan
>> 
>> 
>>> Am 14.05.2018 um 13:00 schrieb Juho Autio <juho.autio@rovio.com <ma...@rovio.com>>:
>>> 
>>> We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old state. After restoring state from a checkpoint, it seems like a timer had been restored, but not the data that was expected to be in a related MapState if such timer has been added.
>>> 
>>> The way I see this is that there's a bug, either of these:
>>> - The writing of timers & map states to Flink state is not synchronized (or maybe there are no such guarantees by design?)
>>> - Flink may restore a checkpoint that is actually corrupted/incomplete
>>> 
>>> Our code (simplified):
>>> 
>>>     private MapState<String, String> mapState;
>>> 
>>>     public void processElement(..) {
>>>             mapState.put("lastUpdated", ctx.timestamp().toString());
>>>             ctx.timerService().registerEventTimeTimer(ctx.timestamp() + stateRetentionMillis);
>>>     }
>>> 
>>>     public void onTimer(long timestamp, OnTimerContext ctx, ..) {
>>>         long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
>>>         if (timestamp >= lastUpdated + stateRetentionMillis) {
>>>             mapState.clear();
>>>         }
>>>     }
>>> 
>>> Normally this "just works". As you can see, it shouldn't be possible that "lastUpdated" doesn't exist in state if timer was registered and onTimer gets called.
>>> 
>>> However, after restoring state from a checkpoint, the job kept failing with this error:
>>> 
>>> Caused by: java.lang.NumberFormatException: null
>>> at java.lang.Long.parseLong(Long.java:552)
>>> at java.lang.Long.parseLong(Long.java:631)
>>> at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
>>> ..
>>> 
>>> So apparently onTimer was called but lastUpdated wasn't found in the MapState.
>>> 
>>> The background for restoring state in this case is not entirely clean. There was an OS level issue "Too many open files" after running a job for ~11 days. To fix that, we replaced the cluster with a new one and launched the Flink job again. State was successfully restored from the latest checkpoint that had been created by the "problematic execution". Now, I'm assuming that if the state wouldn't have been created successfully, restoring wouldn't succeed either – correct? This is just to rule out that the issue with state didn't happen because the checkpoint files were somehow corrupted due to the Too many open files problem.
>>> 
>>> Thank you all for your continued support!
>>> 
>>> P.S. I would be very much interested to hear if there's some cleaner way to achieve this kind of TTL for keyed state in Flink.
>> 
>> 
>> 
> 


Re: Missing MapState when Timer fires after restored state

Posted by Stefan Richter <s....@data-artisans.com>.
What I would like to see from the logs is (also depending a bit on your log level):

- all exceptions.
- in which context exactly the „too many open files“ problem occurred, because I think for checkpoint consistency it should not matter as a checkpoint with such a problem should never succeed.
- files that are written for checkpoints/savepoints.
- completed checkpoints/savepoints ids.
- the restored checkpoint/savepoint id.
- files that are loaded on restore.

> Am 15.05.2018 um 10:02 schrieb Juho Autio <ju...@rovio.com>:
> 
> Thanks all. I'll have to see about sharing the logs & configuration..
> 
> Is there something special that you'd like to see from the logs? It may be easier for me to get specific lines and obfuscate sensitive information instead of trying to do that for the full logs.
> 
> We basically have: RocksDBStateBackend with enableIncrementalCheckpointing=true, external state path on s3.
> 
> The code that we use is:
> 
>             env.setStateBackend(getStateBackend(statePath, new RocksDBStateBackend(statePath, true)));
>             env.getCheckpointConfig().setMinPauseBetweenCheckpoints(params.getLong("checkpoint.minPause", 60 * 1000));
>             env.getCheckpointConfig().setMaxConcurrentCheckpoints(params.getInt("checkpoint.maxConcurrent", 1));
>             env.getCheckpointConfig().setCheckpointTimeout(params.getLong("checkpoint.timeout", 10 * 60 * 1000));
>             env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> 
> The problematic state that we tried to use was a checkpoint created with this conf.
> 
> > Are you using the local recovery feature?
> 
> Yes, and in this particular case the job was constantly failing/restarting because of Too Many Open Files. So we terminated the cluster entirely, created a new one, and launched a new job by specifying the latest checkpoint path to restore state from.
> 
> This is the only time I have seen this error happen with timer state. I still have that bad checkpoint data on s3, so I might be able to try to restore it again if needed to debug it. But that would require some tweaking, because I don't want to tangle with the same kafka consumer group offsets or send old data again to production endpoint.
> 
> Please keep in mind that there was that Too Many Open Files issue on the cluster that created the problematic checkpoint, if you think that's relevant.
> 
> On Tue, May 15, 2018 at 10:39 AM, Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>> wrote:
> Hi,
> 
> I agree, this looks like a bug. Can you tell us your exact configuration of the state backend, e.g. if you are using incremental checkpoints or not. Are you using the local recovery feature? Are you restarting the job from a checkpoint or a savepoint? Can you provide logs for both the job that failed and the restarted job?
> 
> Best,
> Stefan
> 
> 
>> Am 14.05.2018 um 13:00 schrieb Juho Autio <juho.autio@rovio.com <ma...@rovio.com>>:
>> 
>> We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old state. After restoring state from a checkpoint, it seems like a timer had been restored, but not the data that was expected to be in a related MapState if such timer has been added.
>> 
>> The way I see this is that there's a bug, either of these:
>> - The writing of timers & map states to Flink state is not synchronized (or maybe there are no such guarantees by design?)
>> - Flink may restore a checkpoint that is actually corrupted/incomplete
>> 
>> Our code (simplified):
>> 
>>     private MapState<String, String> mapState;
>> 
>>     public void processElement(..) {
>>             mapState.put("lastUpdated", ctx.timestamp().toString());
>>             ctx.timerService().registerEventTimeTimer(ctx.timestamp() + stateRetentionMillis);
>>     }
>> 
>>     public void onTimer(long timestamp, OnTimerContext ctx, ..) {
>>         long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
>>         if (timestamp >= lastUpdated + stateRetentionMillis) {
>>             mapState.clear();
>>         }
>>     }
>> 
>> Normally this "just works". As you can see, it shouldn't be possible that "lastUpdated" doesn't exist in state if timer was registered and onTimer gets called.
>> 
>> However, after restoring state from a checkpoint, the job kept failing with this error:
>> 
>> Caused by: java.lang.NumberFormatException: null
>> at java.lang.Long.parseLong(Long.java:552)
>> at java.lang.Long.parseLong(Long.java:631)
>> at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
>> ..
>> 
>> So apparently onTimer was called but lastUpdated wasn't found in the MapState.
>> 
>> The background for restoring state in this case is not entirely clean. There was an OS level issue "Too many open files" after running a job for ~11 days. To fix that, we replaced the cluster with a new one and launched the Flink job again. State was successfully restored from the latest checkpoint that had been created by the "problematic execution". Now, I'm assuming that if the state wouldn't have been created successfully, restoring wouldn't succeed either – correct? This is just to rule out that the issue with state didn't happen because the checkpoint files were somehow corrupted due to the Too many open files problem.
>> 
>> Thank you all for your continued support!
>> 
>> P.S. I would be very much interested to hear if there's some cleaner way to achieve this kind of TTL for keyed state in Flink.
> 
> 
> 


Re: Missing MapState when Timer fires after restored state

Posted by Juho Autio <ju...@rovio.com>.
Thanks all. I'll have to see about sharing the logs & configuration..

Is there something special that you'd like to see from the logs? It may be
easier for me to get specific lines and obfuscate sensitive information
instead of trying to do that for the full logs.

We basically have: RocksDBStateBackend with
enableIncrementalCheckpointing=true,
external state path on s3.

The code that we use is:

            env.setStateBackend(getStateBackend(statePath, new
RocksDBStateBackend(statePath, true)));

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(params.getLong("checkpoint.minPause",
60 * 1000));

env.getCheckpointConfig().setMaxConcurrentCheckpoints(params.getInt("checkpoint.maxConcurrent",
1));

env.getCheckpointConfig().setCheckpointTimeout(params.getLong("checkpoint.timeout",
10 * 60 * 1000));

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

The problematic state that we tried to use was a checkpoint created with
this conf.

> Are you using the local recovery feature?

Yes, and in this particular case the job was constantly failing/restarting
because of Too Many Open Files. So we terminated the cluster entirely,
created a new one, and launched a new job by specifying the latest
checkpoint path to restore state from.

This is the only time I have seen this error happen with timer state. I
still have that bad checkpoint data on s3, so I might be able to try to
restore it again if needed to debug it. But that would require some
tweaking, because I don't want to tangle with the same kafka consumer group
offsets or send old data again to production endpoint.

Please keep in mind that there was that Too Many Open Files issue on the
cluster that created the problematic checkpoint, if you think that's
relevant.

On Tue, May 15, 2018 at 10:39 AM, Stefan Richter <
s.richter@data-artisans.com> wrote:

> Hi,
>
> I agree, this looks like a bug. Can you tell us your exact configuration
> of the state backend, e.g. if you are using incremental checkpoints or not.
> Are you using the local recovery feature? Are you restarting the job from a
> checkpoint or a savepoint? Can you provide logs for both the job that
> failed and the restarted job?
>
> Best,
> Stefan
>
>
> Am 14.05.2018 um 13:00 schrieb Juho Autio <ju...@rovio.com>:
>
> We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old
> state. After restoring state from a checkpoint, it seems like a timer had
> been restored, but not the data that was expected to be in a related
> MapState if such timer has been added.
>
> The way I see this is that there's a bug, either of these:
> - The writing of timers & map states to Flink state is not synchronized
> (or maybe there are no such guarantees by design?)
> - Flink may restore a checkpoint that is actually corrupted/incomplete
>
> Our code (simplified):
>
>     private MapState<String, String> mapState;
>
>     public void processElement(..) {
>             mapState.put("lastUpdated", ctx.timestamp().toString());
>             ctx.timerService().registerEventTimeTimer(ctx.timestamp() +
> stateRetentionMillis);
>     }
>
>     public void onTimer(long timestamp, OnTimerContext ctx, ..) {
>         long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
>         if (timestamp >= lastUpdated + stateRetentionMillis) {
>             mapState.clear();
>         }
>     }
>
> Normally this "just works". As you can see, it shouldn't be possible that
> "lastUpdated" doesn't exist in state if timer was registered and onTimer
> gets called.
>
> However, after restoring state from a checkpoint, the job kept failing
> with this error:
>
> Caused by: java.lang.NumberFormatException: null
> at java.lang.Long.parseLong(Long.java:552)
> at java.lang.Long.parseLong(Long.java:631)
> at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.
> java:136)
> ..
>
> So apparently onTimer was called but lastUpdated wasn't found in the
> MapState.
>
> The background for restoring state in this case is not entirely clean.
> There was an OS level issue "Too many open files" after running a job for
> ~11 days. To fix that, we replaced the cluster with a new one and launched
> the Flink job again. State was successfully restored from the latest
> checkpoint that had been created by the "problematic execution". Now, I'm
> assuming that if the state wouldn't have been created successfully,
> restoring wouldn't succeed either – correct? This is just to rule out that
> the issue with state didn't happen because the checkpoint files were
> somehow corrupted due to the Too many open files problem.
>
> Thank you all for your continued support!
>
> P.S. I would be very much interested to hear if there's some cleaner way
> to achieve this kind of TTL for keyed state in Flink.
>
>
>

Re: Missing MapState when Timer fires after restored state

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

I agree, this looks like a bug. Can you tell us your exact configuration of the state backend, e.g. if you are using incremental checkpoints or not. Are you using the local recovery feature? Are you restarting the job from a checkpoint or a savepoint? Can you provide logs for both the job that failed and the restarted job?

Best,
Stefan

> Am 14.05.2018 um 13:00 schrieb Juho Autio <ju...@rovio.com>:
> 
> We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old state. After restoring state from a checkpoint, it seems like a timer had been restored, but not the data that was expected to be in a related MapState if such timer has been added.
> 
> The way I see this is that there's a bug, either of these:
> - The writing of timers & map states to Flink state is not synchronized (or maybe there are no such guarantees by design?)
> - Flink may restore a checkpoint that is actually corrupted/incomplete
> 
> Our code (simplified):
> 
>     private MapState<String, String> mapState;
> 
>     public void processElement(..) {
>             mapState.put("lastUpdated", ctx.timestamp().toString());
>             ctx.timerService().registerEventTimeTimer(ctx.timestamp() + stateRetentionMillis);
>     }
> 
>     public void onTimer(long timestamp, OnTimerContext ctx, ..) {
>         long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
>         if (timestamp >= lastUpdated + stateRetentionMillis) {
>             mapState.clear();
>         }
>     }
> 
> Normally this "just works". As you can see, it shouldn't be possible that "lastUpdated" doesn't exist in state if timer was registered and onTimer gets called.
> 
> However, after restoring state from a checkpoint, the job kept failing with this error:
> 
> Caused by: java.lang.NumberFormatException: null
> at java.lang.Long.parseLong(Long.java:552)
> at java.lang.Long.parseLong(Long.java:631)
> at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
> ..
> 
> So apparently onTimer was called but lastUpdated wasn't found in the MapState.
> 
> The background for restoring state in this case is not entirely clean. There was an OS level issue "Too many open files" after running a job for ~11 days. To fix that, we replaced the cluster with a new one and launched the Flink job again. State was successfully restored from the latest checkpoint that had been created by the "problematic execution". Now, I'm assuming that if the state wouldn't have been created successfully, restoring wouldn't succeed either – correct? This is just to rule out that the issue with state didn't happen because the checkpoint files were somehow corrupted due to the Too many open files problem.
> 
> Thank you all for your continued support!
> 
> P.S. I would be very much interested to hear if there's some cleaner way to achieve this kind of TTL for keyed state in Flink.