You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Conrad Crampton <co...@SecData.com> on 2017/08/16 16:41:22 UTC

Reload DistributedCache file?

Hi,
I have a simple text file that is stored in HDFS which I use in a RichFilterFunction by way of DistributedCache file. The file is externally edited periodically to have other lines added to it. My FilterFunction also implements Runnable whose run method is run as a scheduleAtFixedRate method of ScheduledExectutorService which reloads the file and stores the results in a List in the Filter class.

I have realized the errors of my ways as the file that is reloaded is the cached file that is copied to temporary file location on the node which this instance of Filter class is loaded and not the file from HDFS directly (as this has been copied when the Flink job started.

Can anyone suggest a solution to this? It is I think a similar problem that Add Side Inputs in Flink [1] proposal is trying to address but not finalized yet.
Can anyone see a problem if I have a thread that reloads the HDFS file being in the main body of my Flink program and registers the cache file within that reload process e.g.

env.registerCachedFile(properties.getProperty("whitelist.location"), WHITELIST);

i.e. does this actually copy the file again from HDFS to temporary files on each node? I think I’d have to have the same schedule I have currently that reload within my Filter function too though as all the previous process would do is to push the HDFS file to temp location and not actually refresh my List.

Any suggestions would be welcome.

Thanks
Conrad

[1] https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiXwSBXjv-M5eOv-MKQYN3m4/edit#heading=h.pqg5z6g0mjm7


SecureData, combating cyber threats
______________________________________________________________________ 
The information contained in this message or any of its attachments may be privileged and confidential and intended for the exclusive use of the intended recipient. If you are not the intended recipient any disclosure, reproduction, distribution or other dissemination or use of this communications is strictly prohibited. The views expressed in this email are those of the individual and not necessarily of SecureData Europe Ltd. Any prices quoted are only valid if followed up by a formal written quote.

SecureData Europe Limited. Registered in England & Wales 04365896. Registered Address: SecureData House, Hermitage Court, Hermitage Lane, Maidstone, Kent, ME16 9NT

Re: Reload DistributedCache file?

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

You are right in your analysis that updating the entry in the DistributedCache will not in fact update the file on already running operators. The DistributedCache is only read when initially setting up operations. You might be able to use a file source (with CONTINUOUS processing mode) and a connected stream where the first input is your normal input and the second input are the (periodically updated) contents of your file in HDFS.

Would this work for you?

Best,
Aljoscha

> On 17. Aug 2017, at 10:00, Conrad Crampton <co...@SecData.com> wrote:
> 
> Whilst this looks great and thanks for pointing this out, the problem isn’t so much as with identifying changes in the HDFS file as the overhead in re-loading it isn’t significant (10s of lines long) but the problem is more to do with using the Flink mechanisms i.e. DistributedCache and refreshing this on a local level (to the taskmanager node).
>  
> I may have to read HDFS directly in the FilterFunction to keep it up to date.
>  
> Thanks
> Conrad
>  
> From: Ted Yu <yu...@gmail.com>
> Date: Wednesday, 16 August 2017 at 18:11
> To: Conrad Crampton <co...@SecData.com>
> Cc: "user@flink.apache.org" <us...@flink.apache.org>
> Subject: Re: Reload DistributedCache file?
>  
> For hdfs, there is iNotify mechanism.
>  
> https://issues.apache.org/jira/browse/HDFS-6634 <https://issues.apache.org/jira/browse/HDFS-6634>
>  
> https://www.slideshare.net/Hadoop_Summit/keep-me-in-the-loop-inotify-in-hdfs <https://www.slideshare.net/Hadoop_Summit/keep-me-in-the-loop-inotify-in-hdfs>
>  
> FYI
>  
> On Wed, Aug 16, 2017 at 9:41 AM, Conrad Crampton <conrad.crampton@secdata.com <ma...@secdata.com>> wrote:
> Hi,
> I have a simple text file that is stored in HDFS which I use in a RichFilterFunction by way of DistributedCache file. The file is externally edited periodically to have other lines added to it. My FilterFunction also implements Runnable whose run method is run as a scheduleAtFixedRate method of ScheduledExectutorService which reloads the file and stores the results in a List in the Filter class.
>  
> I have realized the errors of my ways as the file that is reloaded is the cached file that is copied to temporary file location on the node which this instance of Filter class is loaded and not the file from HDFS directly (as this has been copied when the Flink job started.
>  
> Can anyone suggest a solution to this? It is I think a similar problem that Add Side Inputs in Flink [1] proposal is trying to address but not finalized yet.
> Can anyone see a problem if I have a thread that reloads the HDFS file being in the main body of my Flink program and registers the cache file within that reload process e.g.
>  
> env.registerCachedFile(properties.getProperty("whitelist.location"), WHITELIST);
>  
> i.e. does this actually copy the file again from HDFS to temporary files on each node? I think I’d have to have the same schedule I have currently that reload within my Filter function too though as all the previous process would do is to push the HDFS file to temp location and not actually refresh my List.
>  
> Any suggestions would be welcome.
>  
> Thanks
> Conrad
>  
> [1] https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiXwSBXjv-M5eOv-MKQYN3m4/edit#heading=h.pqg5z6g0mjm7 <https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiXwSBXjv-M5eOv-MKQYN3m4/edit#heading=h.pqg5z6g0mjm7>
>  
> 
> SecureData, combating cyber threats
>  
> The information contained in this message or any of its attachments may be privileged and confidential and intended for the exclusive use of the intended recipient. If you are not the intended recipient any disclosure, reproduction, distribution or other dissemination or use of this communications is strictly prohibited. The views expressed in this email are those of the individual and not necessarily of SecureData Europe Ltd. Any prices quoted are only valid if followed up by a formal written quote.
> 
> SecureData Europe Limited. Registered in England & Wales 04365896. Registered Address: SecureData House, Hermitage Court, Hermitage Lane, Maidstone, Kent, ME16 9NT
> 
>  
>  
> 
> ***This email originated outside SecureData***
> 
> Click here <https://www.mailcontrol.com/sr/MY4bnrNl+sPGX2PQPOmvUrxsj2JZCuwu2DTYNQkvjbg!dlAMiOPe32HUzm7icNaCC2UNWDX6TnbJaydNu2tBwg==> to report this email as spam.
> 


Re: Reload DistributedCache file?

Posted by Conrad Crampton <co...@SecData.com>.
Whilst this looks great and thanks for pointing this out, the problem isn’t so much as with identifying changes in the HDFS file as the overhead in re-loading it isn’t significant (10s of lines long) but the problem is more to do with using the Flink mechanisms i.e. DistributedCache and refreshing this on a local level (to the taskmanager node).

I may have to read HDFS directly in the FilterFunction to keep it up to date.

Thanks
Conrad

From: Ted Yu <yu...@gmail.com>
Date: Wednesday, 16 August 2017 at 18:11
To: Conrad Crampton <co...@SecData.com>
Cc: "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: Reload DistributedCache file?

For hdfs, there is iNotify mechanism.

https://issues.apache.org/jira/browse/HDFS-6634

https://www.slideshare.net/Hadoop_Summit/keep-me-in-the-loop-inotify-in-hdfs

FYI

On Wed, Aug 16, 2017 at 9:41 AM, Conrad Crampton <co...@secdata.com>> wrote:
Hi,
I have a simple text file that is stored in HDFS which I use in a RichFilterFunction by way of DistributedCache file. The file is externally edited periodically to have other lines added to it. My FilterFunction also implements Runnable whose run method is run as a scheduleAtFixedRate method of ScheduledExectutorService which reloads the file and stores the results in a List in the Filter class.

I have realized the errors of my ways as the file that is reloaded is the cached file that is copied to temporary file location on the node which this instance of Filter class is loaded and not the file from HDFS directly (as this has been copied when the Flink job started.

Can anyone suggest a solution to this? It is I think a similar problem that Add Side Inputs in Flink [1] proposal is trying to address but not finalized yet.
Can anyone see a problem if I have a thread that reloads the HDFS file being in the main body of my Flink program and registers the cache file within that reload process e.g.

env.registerCachedFile(properties.getProperty("whitelist.location"), WHITELIST);

i.e. does this actually copy the file again from HDFS to temporary files on each node? I think I’d have to have the same schedule I have currently that reload within my Filter function too though as all the previous process would do is to push the HDFS file to temp location and not actually refresh my List.

Any suggestions would be welcome.

Thanks
Conrad

[1] https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiXwSBXjv-M5eOv-MKQYN3m4/edit#heading=h.pqg5z6g0mjm7

SecureData, combating cyber threats

________________________________

The information contained in this message or any of its attachments may be privileged and confidential and intended for the exclusive use of the intended recipient. If you are not the intended recipient any disclosure, reproduction, distribution or other dissemination or use of this communications is strictly prohibited. The views expressed in this email are those of the individual and not necessarily of SecureData Europe Ltd. Any prices quoted are only valid if followed up by a formal written quote.

SecureData Europe Limited. Registered in England & Wales 04365896. Registered Address: SecureData House, Hermitage Court, Hermitage Lane, Maidstone, Kent, ME16 9NT



***This email originated outside SecureData***

Click here<https://www.mailcontrol.com/sr/MY4bnrNl+sPGX2PQPOmvUrxsj2JZCuwu2DTYNQkvjbg!dlAMiOPe32HUzm7icNaCC2UNWDX6TnbJaydNu2tBwg==> to report this email as spam.

Re: Reload DistributedCache file?

Posted by Ted Yu <yu...@gmail.com>.
For hdfs, there is iNotify mechanism.

https://issues.apache.org/jira/browse/HDFS-6634

https://www.slideshare.net/Hadoop_Summit/keep-me-in-the-loop-inotify-in-hdfs

FYI

On Wed, Aug 16, 2017 at 9:41 AM, Conrad Crampton <
conrad.crampton@secdata.com> wrote:

> Hi,
>
> I have a simple text file that is stored in HDFS which I use in a
> RichFilterFunction by way of DistributedCache file. The file is externally
> edited periodically to have other lines added to it. My FilterFunction also
> implements Runnable whose run method is run as a scheduleAtFixedRate method
> of ScheduledExectutorService which reloads the file and stores the results
> in a List in the Filter class.
>
>
>
> I have realized the errors of my ways as the file that is reloaded is the
> cached file that is copied to temporary file location on the node which
> this instance of Filter class is loaded and not the file from HDFS directly
> (as this has been copied when the Flink job started.
>
>
>
> Can anyone suggest a solution to this? It is I think a similar problem
> that Add Side Inputs in Flink [1] proposal is trying to address but not
> finalized yet.
>
> Can anyone see a problem if I have a thread that reloads the HDFS file
> being in the main body of my Flink program and registers the cache file
> within that reload process e.g.
>
>
>
> env.registerCachedFile(properties.getProperty(*"whitelist.location"*),
> *WHITELIST*);
>
>
>
> i.e. does this actually copy the file again from HDFS to temporary files
> on each node? I think I’d have to have the same schedule I have currently
> that reload within my Filter function too though as all the previous
> process would do is to push the HDFS file to temp location and not actually
> refresh my List.
>
>
>
> Any suggestions would be welcome.
>
>
>
> Thanks
>
> Conrad
>
>
>
> [1] https://docs.google.com/document/d/1hIgxi2Zchww_
> 5fWUHLoYiXwSBXjv-M5eOv-MKQYN3m4/edit#heading=h.pqg5z6g0mjm7
>
>
> SecureData, combating cyber threats
>
> ------------------------------
>
> The information contained in this message or any of its attachments may be
> privileged and confidential and intended for the exclusive use of the
> intended recipient. If you are not the intended recipient any disclosure,
> reproduction, distribution or other dissemination or use of this
> communications is strictly prohibited. The views expressed in this email
> are those of the individual and not necessarily of SecureData Europe Ltd.
> Any prices quoted are only valid if followed up by a formal written quote.
>
> SecureData Europe Limited. Registered in England & Wales 04365896.
> Registered Address: SecureData House, Hermitage Court, Hermitage Lane,
> Maidstone, Kent, ME16 9NT
>