You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Sihua Zhou (JIRA)" <ji...@apache.org> on 2017/10/31 02:16:00 UTC

[jira] [Updated] (FLINK-7873) Introduce CheckpointCacheManager for reading checkpoint data locally when performing failover

     [ https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sihua Zhou updated FLINK-7873:
------------------------------
    Description: 
Why i introduce this:
    Current recover strategy will always read checkpoint data from remote FileStream (HDFS). This will cost a lot of bandwidth when the state is so big (e.g. 1T). What's worse, if this job performs recover again and again, it can eat up all network bandwidth and do a huge hurt to cluster. So, I proposed that we can cache the checkpoint data locally, and read checkpoint data from local cache as well as we can, we read the data from remote only if we fail locally. The advantage is that if a execution is assigned to the same TaskManager as before, it can save a lot of bandwith, and obtain a faster recover.

Solution:
    TaskManager do the cache job and manage the cached data itself. It simple use a TTL-like method to manage cache entry's dispose, we dispose a entry if it wasn't be touched for a X time, once we touch a entry we reset the TTL for it. In this way, all jobs is done by TaskManager, it transparent to JobManager. The only problem is that we may dispose a entry that maybe useful, in this case, we have to read from remote data finally, but users can avoid this by set a proper TTL value according to checkpoint interval and other things.

Can someone give me some advice? I would appreciate it very much~

  was:
Current recover strategy will always read checkpoint data from remote FileStream (HDFS). This will cost a lot of bandwidth when the state is so big (e.g. 1T). What's worse, if this job performs recover again and again, it can eat up all network bandwidth and do a huge hurt to cluster. So, I proposed that we can cache the checkpoint data locally, and read checkpoint data from local cache as well as we can, we read the data from remote only if we fail locally. The advantage is that if a execution is assigned to the same TaskManager as before, it can save a lot of bandwidth, and obtain a faster recover.


Key problems:
1. Cache the checkpoint data on local disk and manage it's create and delete.
2. introduce a HybridStreamStateHandler which try to create a local input stream first, if failed, it then create a remote input stream, it prototype looks like below:
{code:java}
class HybridStreamHandle {
   private StreamStateHandle localHandle;
   private StreamStateHandle remoteHandle;
   ......
   public FSDataInputStream openInputStream() throws IOException {
        FSDataInputStream inputStream = localHandle.openInputStream();
        return inputStream != null ? inputStream : remoteHandle.openInputStream();
    }
   .....
}
{code}

Solution:
	There are two kind solutions I can think of.

solution1:
	Backend do the cached job, and the HybridStreamHandle point to both local and remote data, HybridStreamHandle is managed by CheckpointCoordinator as well as other StreamHandle, so CheckpointCoordinator will perform dispose on it. when HybridStreamHandle performs dispose it call localHandle.dispose() and remoteHandle.dispose(). In this way, we have to record TaskManager's info (like location) in localHandle and add an entry in TaskManager to handle localHandle dispose message, we also have to consider the HA situation.

solution2:
	TaskManager do the cached job and manage the cached data itself. It simple use a TTL-like method to manage handle's dispose, we dispose a handle if it wasn't be touched for a X time. We will touch the handles when we recover from checkpoint or when we performs a checkpoint, once we touch a handle we reset the TTL for it. In this way, all jobs is done by Backend, it transparent to JobManager. The only problem is that we may dispose a handle that maybe useful, but even in this case, we can read from remote data finally, and users can avoid this by set a proper TTL value according to checkpoint interval and other things.

Consider trying not to complicate the problem reasons, i prefer to use the solution2. Can someone give me some advice? I would appreciate it very much~


> Introduce CheckpointCacheManager for reading checkpoint data locally when performing failover
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-7873
>                 URL: https://issues.apache.org/jira/browse/FLINK-7873
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.3.2
>            Reporter: Sihua Zhou
>            Assignee: Sihua Zhou
>
> Why i introduce this:
>     Current recover strategy will always read checkpoint data from remote FileStream (HDFS). This will cost a lot of bandwidth when the state is so big (e.g. 1T). What's worse, if this job performs recover again and again, it can eat up all network bandwidth and do a huge hurt to cluster. So, I proposed that we can cache the checkpoint data locally, and read checkpoint data from local cache as well as we can, we read the data from remote only if we fail locally. The advantage is that if a execution is assigned to the same TaskManager as before, it can save a lot of bandwith, and obtain a faster recover.
> Solution:
>     TaskManager do the cache job and manage the cached data itself. It simple use a TTL-like method to manage cache entry's dispose, we dispose a entry if it wasn't be touched for a X time, once we touch a entry we reset the TTL for it. In this way, all jobs is done by TaskManager, it transparent to JobManager. The only problem is that we may dispose a entry that maybe useful, in this case, we have to read from remote data finally, but users can avoid this by set a proper TTL value according to checkpoint interval and other things.
> Can someone give me some advice? I would appreciate it very much~



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)