You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Nico Kruber <ni...@data-artisans.com> on 2017/06/13 14:31:21 UTC

[DISCUSS] FLIP-19: Improved BLOB storage architecture

Hi all,
I'd like to initiate a discussion on some architectural changes for the BLOB 
server which finally add a proper cleanup story, remove some dead code, and 
extend the BLOB store's use for off-loaded (large) RPC messages.

Please have a look at FLIP-19 that details the proposed changes:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB
+storage+architecture


Regards
Nico

PS: While doing the re-write, I'd also like to fix some concurrency issues in 
the current code.

Re: [DISCUSS] FLIP-19: Improved BLOB storage architecture

Posted by Till Rohrmann <tr...@apache.org>.
Hi Biao,

you're right. What you've described is a totally valid use case and we
should design the interfaces such that you can have specialized
implementations for the cases where you can exploit things like a common
DFS. I think Nico's design should include this.

Cheers,
Till

On Fri, Jun 16, 2017 at 4:10 PM, Biao Liu <mm...@gmail.com> wrote:

> Hi Till
>
> I agree with you about the Flink's DC. It is another topic indeed. I just
> thought that we can think more about it before refactoring BLOB service.
> Make sure that it's easy to implement DC on the refactored architecture.
>
> I have another question about BLOB service. Can we abstract the BLOB
> service to some high-level interfaces? May be just some put/get methods in
> the interfaces. Easy to extend will be useful in some scenarios.
>
> For example in Yarn mode, there are some cool features interesting us.
> 1. Yarn can localize files only once in one slave machine, all TMs in the
> same job can share these files. That may save lots of bandwidth for large
> scale jobs or jobs which have large BLOBs.
> 2. We can skip uploading files if they are already on DFS. That's a common
> scenario in distributed cache.
> 3. Even more, actually we don't need a BlobServer component in Yarn mode.
> We can rely on DFS to distribute files. There is always a DFS available in
> Yarn cluster.
>
> If we do so, the BLOB service through network can be the default
> implementation. It could work in any situation. It's also clear that it
> does not dependent on Hadoop explicitly. And we can do some optimization in
> different kinds of clusters without any hacking.
>
> That are just some rough ideas above. But I think well abstracted
> interfaces will be very helpful.
>

Re: [DISCUSS] FLIP-19: Improved BLOB storage architecture

Posted by Biao Liu <mm...@gmail.com>.
Hi Till

I agree with you about the Flink's DC. It is another topic indeed. I just
thought that we can think more about it before refactoring BLOB service.
Make sure that it's easy to implement DC on the refactored architecture.

I have another question about BLOB service. Can we abstract the BLOB
service to some high-level interfaces? May be just some put/get methods in
the interfaces. Easy to extend will be useful in some scenarios.

For example in Yarn mode, there are some cool features interesting us.
1. Yarn can localize files only once in one slave machine, all TMs in the
same job can share these files. That may save lots of bandwidth for large
scale jobs or jobs which have large BLOBs.
2. We can skip uploading files if they are already on DFS. That's a common
scenario in distributed cache.
3. Even more, actually we don't need a BlobServer component in Yarn mode.
We can rely on DFS to distribute files. There is always a DFS available in
Yarn cluster.

If we do so, the BLOB service through network can be the default
implementation. It could work in any situation. It's also clear that it
does not dependent on Hadoop explicitly. And we can do some optimization in
different kinds of clusters without any hacking.

That are just some rough ideas above. But I think well abstracted
interfaces will be very helpful.

Re: [DISCUSS] FLIP-19: Improved BLOB storage architecture

Posted by Till Rohrmann <tr...@apache.org>.
Hi Biao,

you're right that the BlobServer won't live in the JM in FLIP-6. Instead it
will either be part of the RM or the dispatcher component depending on the
actual implementation. The requirements for the BlobServer should, however,
be the same.

Concerning the question about Flink's distributed cache, I think this is an
orthogonal topic. I think that we should be able to piggy-back on the
BlobServer once we have refactored it. This should simplify the DC's
implementation a bit.

Cheers,
Till

On Thu, Jun 15, 2017 at 9:16 AM, Biao Liu <mm...@gmail.com> wrote:

> I have the same concern with Chesnay Schepler. AFIK Flink does not support
> DC as well as Mapreduce and Spark. We only support DC in DataSet API. And
> DC in flink do not support local files. Is this a good change to refactor
> DC too?
>
> I have another concern, currently BLOB server has some conflicts with
> FLIP-6 architecture. We start JM while submitting job instead of starting
> it before in FLIP-6. If BLOB server is embedded in JM we can not upload
> jars and files before JM started. But the fact is that we need jars
> uploaded before starting JM. Correct me is I was wrong.
> To solve this problem we can separate submitting job into different stage.
> Or we can separate BLOB server as a independent component parallel with RM.
>
> Maybe we can think more about these in FLIP-19, what do you think? @Nico
>

Re: [DISCUSS] FLIP-19: Improved BLOB storage architecture

Posted by Biao Liu <mm...@gmail.com>.
I have the same concern with Chesnay Schepler. AFIK Flink does not support
DC as well as Mapreduce and Spark. We only support DC in DataSet API. And
DC in flink do not support local files. Is this a good change to refactor
DC too?

I have another concern, currently BLOB server has some conflicts with
FLIP-6 architecture. We start JM while submitting job instead of starting
it before in FLIP-6. If BLOB server is embedded in JM we can not upload
jars and files before JM started. But the fact is that we need jars
uploaded before starting JM. Correct me is I was wrong.
To solve this problem we can separate submitting job into different stage.
Or we can separate BLOB server as a independent component parallel with RM.

Maybe we can think more about these in FLIP-19, what do you think? @Nico

Re: [DISCUSS] FLIP-19: Improved BLOB storage architecture

Posted by Chesnay Schepler <ch...@apache.org>.
The DC does delete local files. The user requests the file through the 
RuntimeContext, but
the download and local caching is completely handled by Flink.

The problems you mention is exactly why I'm suggesting to rebuild the DC 
on-top of the Blob service.

If a user currently wants to use the for a local file he has to do the 
following:

 1. upload the file to dfs
 2. register file with flink
 3. delete distributed file

But in step 2, couldn't we load the file into the Blob storage and 
remove it once the job finishes?

On 14.06.2017 15:23, Nico Kruber wrote:
> Actually, I don't see the automatic cleanup you referred to but from what I
> can see around the DistributedCache class and its use, it is simply a registry
> for user-files whose life cycle is completely managed by the user (upload,
> download, delete). Files may even reside outside of flink's control and survive
> job termination.
>
> Therefore, it is a different use case which can be (mis-)used as some sort of
> blob storage. It...
> (1) will not work if there is no distributed/commonly-accessible file system,
> (2) does not provide a life cycle management for distributed files,
> (3) does not delete local files (this gets especially complicated if files are
> shared, e.g. multiple task at one TaskManager running the same job with the
> same libraries),
> ...
>
>
> Nico
>
> On Wednesday, 14 June 2017 12:29:55 CEST Chesnay Schepler wrote:
>> One thing i was wondering for a long time now is why the distributed
>> cache is not implemented
>> via the blob store.
>>
>> The DC is essentially just a copy routine with local caching and
>> automatic cleanup, so basically what
>> the blob store is supposed to do (i guess).
>>
>> On 13.06.2017 16:31, Nico Kruber wrote:
>>> Hi all,
>>> I'd like to initiate a discussion on some architectural changes for the
>>> BLOB server which finally add a proper cleanup story, remove some dead
>>> code, and extend the BLOB store's use for off-loaded (large) RPC
>>> messages.
>>>
>>> Please have a look at FLIP-19 that details the proposed changes:
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB
>>> +storage+architecture
>>>
>>>
>>> Regards
>>> Nico
>>>
>>> PS: While doing the re-write, I'd also like to fix some concurrency issues
>>> in the current code.



Re: [DISCUSS] FLIP-19: Improved BLOB storage architecture

Posted by Nico Kruber <ni...@data-artisans.com>.
Actually, I don't see the automatic cleanup you referred to but from what I 
can see around the DistributedCache class and its use, it is simply a registry 
for user-files whose life cycle is completely managed by the user (upload, 
download, delete). Files may even reside outside of flink's control and survive 
job termination.

Therefore, it is a different use case which can be (mis-)used as some sort of 
blob storage. It...
(1) will not work if there is no distributed/commonly-accessible file system,
(2) does not provide a life cycle management for distributed files,
(3) does not delete local files (this gets especially complicated if files are 
shared, e.g. multiple task at one TaskManager running the same job with the 
same libraries),
...


Nico

On Wednesday, 14 June 2017 12:29:55 CEST Chesnay Schepler wrote:
> One thing i was wondering for a long time now is why the distributed
> cache is not implemented
> via the blob store.
> 
> The DC is essentially just a copy routine with local caching and
> automatic cleanup, so basically what
> the blob store is supposed to do (i guess).
> 
> On 13.06.2017 16:31, Nico Kruber wrote:
> > Hi all,
> > I'd like to initiate a discussion on some architectural changes for the
> > BLOB server which finally add a proper cleanup story, remove some dead
> > code, and extend the BLOB store's use for off-loaded (large) RPC
> > messages.
> > 
> > Please have a look at FLIP-19 that details the proposed changes:
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB
> > +storage+architecture
> > 
> > 
> > Regards
> > Nico
> > 
> > PS: While doing the re-write, I'd also like to fix some concurrency issues
> > in the current code.


Re: [DISCUSS] FLIP-19: Improved BLOB storage architecture

Posted by Chesnay Schepler <ch...@apache.org>.
One thing i was wondering for a long time now is why the distributed 
cache is not implemented
via the blob store.

The DC is essentially just a copy routine with local caching and 
automatic cleanup, so basically what
the blob store is supposed to do (i guess).

On 13.06.2017 16:31, Nico Kruber wrote:
> Hi all,
> I'd like to initiate a discussion on some architectural changes for the BLOB
> server which finally add a proper cleanup story, remove some dead code, and
> extend the BLOB store's use for off-loaded (large) RPC messages.
>
> Please have a look at FLIP-19 that details the proposed changes:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB
> +storage+architecture
>
>
> Regards
> Nico
>
> PS: While doing the re-write, I'd also like to fix some concurrency issues in
> the current code.