You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Gil De Grove <gi...@digazu.com> on 2022/03/08 07:47:50 UTC

Using another FileSystem configuration while creating a job

Hello everyone,

First of all, sorry for cross posting, I asked on SO, but David Anderson
suggested me to reach out to the community via the mailing list. The link
to the SO question is the following:
https://stackoverflow.com/questions/71381266/using-another-filesystem-configuration-while-creating-a-job

I'll post the answer on SO as soon as I have one :)

I post here the content of the question, so if anyone can help, please let
me know;

Summary

We are currently facing an issue with the FileSystem abstraction in Flink.
We have a job that can dynamically connect to an S3 source (meaning it's
defined at runtime). We discovered a bug in our code, and it could be due
to a wrong assumption on the way the FileSystem works.
Bug explanation

During the initialization of the job, (so in the job manager) we manipulate
the FS to check that some files exist in order to fail gracefully before
the job is executed. In our case, we need to set dynamically the FS. It can
be either HDFS, S3 on AWS or S3 on MinIO. We want the FS configuration to
be specific for the job, and different from the cluster one (different
access key, different endpoint, etc.).

Here is an extract of the code we are using to do so:

  private void validateFileSystemAccess(Configuration configuration)
throws IOException {
    // Create a plugin manager from the configuration
    PluginManager pluginManager =
PluginUtils.createPluginManagerFromRootFolder(configuration);

    // Init the FileSystem from the configuration
    FileSystem.initialize(configuration, pluginManager);

    // Validate the FileSystem: an exception is thrown if FS
configuration is wrong
    Path archiverPath = new Path(this.archiverPath);
    archiverPath.getFileSystem().exists(new Path("/"));
  }

After starting that specific kind of job, we notice that:

   1. the checkpointing does not work for this job, it throws a credential
   error.
   2. the job manager cannot upload the artifacts needed by the history
   server for all jobs already running of all kind (not only this specific
   kind of job).

If we do not deploy that kind of job, the upload of artifacts and the
checkpointing work as expected on the cluster.

We think that this issue might come from the FileSystem.initialize() that
overrides the configuration for all the FileSystems. We think that because
of this, the next call to FileSystem.get() returns the FileSystem we
configured in validateFileSystemAccess instead of the cluster configured
one.
Questions

Could our hypothesis be correct? If so, how could we provide a specific
configuration for the FileSystem without impacting the whole cluster?
Regards,
Gil

Re: Using another FileSystem configuration while creating a job

Posted by Gil De Grove <gi...@digazu.com>.
Hey Chesnay,

Thanks for the thorough answer, much appreciated.
Sorry for the "requesting []...", it was a loss in translation, that passed
my second reading check, the correct verb should have been "asking" :). It
was no request to the community at all, sorry again for that.

The solution to implement a `n scheme` factory that would be configured at
deploy time for our jobs seems interesting to investigate, it crossed our
mind when we discovered the limitation.

Thanks again for all your help, really appreciate it.

Regards,
Gil



On Thu, 10 Mar 2022 at 17:05, Chesnay Schepler <ch...@apache.org> wrote:

> > if we want to use two sets of credentials, for example to access two
> different AWS buckets, that would not be feasible at the moment?
>
> That is correct.
>
> > As it seems that this limitation is quite an important one, is there a
> place where we can find this documented?
>
> I don't think it is explicitly documented; likely because we assume that
> users configure the filesystem through the flink-conf.yaml (and only
> document it as such), which inherently prevents that.
>
> > Would it mean that in order to achieve this, we would have to set up two
> clusters and publishing to a temporary medium? For example, using two
> clusters one configured to access CephFS, one for AWS S3 then publish that
> to Kafka (or use Kafka Connect)?
>
> That would be one approach, yes.
>
> > We are requesting that [it should be possible to configure credentials
> per job]
>
> It is exceedingly unlikely for this to be implemented in the foreseeable
> future.
>
>
> There are some workarounds though.
> For S3 in particular you could capitalize on the fact that we have 2
> filesystem plugins (s3-fs-hadoop and s3-fs-presto), which you could use at
> the same time so long as you use different schemes ( s3a (hadoop) / s3p
> (presto) ) for the different buckets.
> You could also generalize this by taking an existing filesystem plugin
> from Flink and adjusting the contained FileSystemFactory to use a different
> scheme and config keys. It's a bit annyoing, but it should work (now and in
> the future).
> Essentially you'd pretend that there are N completely different
> filesystems, but they are actually all the same implementation just with
> different configurations.
>
> On 10/03/2022 13:30, Gil De Grove wrote:
>
> Hello Chesnay,
>
> Thanks for the reply.
>
> I wonder something based on your reply, if we want to use two sets of
> credentials, for example to access two different AWS buckets, that would
> not be feasible at the moment?
> One example I have in mind would be to separate the credentials for
> accessing data vs storing metadata for a given cluster.
> Another use case would be to create a Flink job that consume data stored
> on AWS S3 and/or on MinIO.
> Would it mean that in order to achieve this, we would have to set up two
> clusters and publishing to a temporary medium? For example, using two
> clusters one configured to access CephFS, one for AWS S3 then publish that
> to Kafka (or use Kafka Connect)?
>
> We are requesting that, as we would like to use the Hybrid source with a
> FileSystem and a Kafka consumer, and this limitation would probably make us
> rethink the architecture.
> As it seems that this limitation is quite an important one, is there a
> place where we can find this documented? Maybe a FLIP? Or an entry in the
> Flink Documentation?
>
> Thanks again for your help,
> Gil
>
>
> On Thu, 10 Mar 2022 at 10:57, Chesnay Schepler <ch...@apache.org> wrote:
>
>> The FileSystem class is essentially one big singleton, with only 1
>> instance of each FileSystem implementation being loaded, shared across all
>> jobs.
>> For that reason we do not support job-specific FileSystem configurations.
>> Note that we generally also don't really support configuring the
>> FileSystems at runtime. The entire codebase assumes that the initialization
>> happens when the process is started.
>>
>> You'll need to run that job in a separate cluster.
>>
>> Overall, this sounds like something that should run externally; assert
>> some precondition, then configure Flink appropriately, then run the job.
>>
>> On 08/03/2022 08:47, Gil De Grove wrote:
>>
>> Hello everyone,
>>
>> First of all, sorry for cross posting, I asked on SO, but David Anderson
>> suggested me to reach out to the community via the mailing list. The link
>> to the SO question is the following:
>> https://stackoverflow.com/questions/71381266/using-another-filesystem-configuration-while-creating-a-job
>>
>> I'll post the answer on SO as soon as I have one :)
>>
>> I post here the content of the question, so if anyone can help, please
>> let me know;
>>
>> Summary
>>
>> We are currently facing an issue with the FileSystem abstraction in
>> Flink. We have a job that can dynamically connect to an S3 source (meaning
>> it's defined at runtime). We discovered a bug in our code, and it could be
>> due to a wrong assumption on the way the FileSystem works.
>> Bug explanation
>>
>> During the initialization of the job, (so in the job manager) we
>> manipulate the FS to check that some files exist in order to fail
>> gracefully before the job is executed. In our case, we need to set
>> dynamically the FS. It can be either HDFS, S3 on AWS or S3 on MinIO. We
>> want the FS configuration to be specific for the job, and different from
>> the cluster one (different access key, different endpoint, etc.).
>>
>> Here is an extract of the code we are using to do so:
>>
>>   private void validateFileSystemAccess(Configuration configuration) throws IOException {
>>     // Create a plugin manager from the configuration
>>     PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);
>>
>>     // Init the FileSystem from the configuration
>>     FileSystem.initialize(configuration, pluginManager);
>>
>>     // Validate the FileSystem: an exception is thrown if FS configuration is wrong
>>     Path archiverPath = new Path(this.archiverPath);
>>     archiverPath.getFileSystem().exists(new Path("/"));
>>   }
>>
>> After starting that specific kind of job, we notice that:
>>
>>    1. the checkpointing does not work for this job, it throws a
>>    credential error.
>>    2. the job manager cannot upload the artifacts needed by the history
>>    server for all jobs already running of all kind (not only this specific
>>    kind of job).
>>
>> If we do not deploy that kind of job, the upload of artifacts and the
>> checkpointing work as expected on the cluster.
>>
>> We think that this issue might come from the FileSystem.initialize()
>> that overrides the configuration for all the FileSystems. We think that
>> because of this, the next call to FileSystem.get() returns the
>> FileSystem we configured in validateFileSystemAccess instead of the
>> cluster configured one.
>> Questions
>>
>> Could our hypothesis be correct? If so, how could we provide a specific
>> configuration for the FileSystem without impacting the whole cluster?
>> Regards,
>> Gil
>>
>>
>>
>

Re: Using another FileSystem configuration while creating a job

Posted by Chesnay Schepler <ch...@apache.org>.
 > if we want to use two sets of credentials, for example to access two 
different AWS buckets, that would not be feasible at the moment?

That is correct.

 > As it seems that this limitation is quite an important one, is there 
a place where we can find this documented?

I don't think it is explicitly documented; likely because we assume that 
users configure the filesystem through the flink-conf.yaml (and only 
document it as such), which inherently prevents that.

 > Would it mean that in order to achieve this, we would have to set up 
two clusters and publishing to a temporary medium? For example, using 
two clusters one configured to access CephFS, one for AWS S3 then 
publish that to Kafka (or use Kafka Connect)?

That would be one approach, yes.

 > We are requesting that [it should be possible to configure 
credentials per job]

It is exceedingly unlikely for this to be implemented in the foreseeable 
future.


There are some workarounds though.
For S3 in particular you could capitalize on the fact that we have 2 
filesystem plugins (s3-fs-hadoop and s3-fs-presto), which you could use 
at the same time so long as you use different schemes ( s3a (hadoop) / 
s3p (presto) ) for the different buckets.
You could also generalize this by taking an existing filesystem plugin 
from Flink and adjusting the contained FileSystemFactory to use a 
different scheme and config keys. It's a bit annyoing, but it should 
work (now and in the future).
Essentially you'd pretend that there are N completely different 
filesystems, but they are actually all the same implementation just with 
different configurations.

On 10/03/2022 13:30, Gil De Grove wrote:
> Hello Chesnay,
>
> Thanks for the reply.
>
> I wonder something based on your reply, if we want to use two sets of 
> credentials, for example to access two different AWS buckets, that 
> would not be feasible at the moment?
> One example I have in mind would be to separate the credentials for 
> accessing data vs storing metadata for a given cluster.
> Another use case would be to create a Flink job that consume data 
> stored on AWS S3 and/or on MinIO.
> Would it mean that in order to achieve this, we would have to set up 
> two clusters and publishing to a temporary medium? For example, using 
> two clusters one configured to access CephFS, one for AWS S3 then 
> publish that to Kafka (or use Kafka Connect)?
>
> We are requesting that, as we would like to use the Hybrid source with 
> a FileSystem and a Kafka consumer, and this limitation would probably 
> make us rethink the architecture.
> As it seems that this limitation is quite an important one, is there a 
> place where we can find this documented? Maybe a FLIP? Or an entry in 
> the Flink Documentation?
>
> Thanks again for your help,
> Gil
>
>
> On Thu, 10 Mar 2022 at 10:57, Chesnay Schepler <ch...@apache.org> wrote:
>
>     The FileSystem class is essentially one big singleton, with only 1
>     instance of each FileSystem implementation being loaded, shared
>     across all jobs.
>     For that reason we do not support job-specific FileSystem
>     configurations.
>     Note that we generally also don't really support configuring the
>     FileSystems at runtime. The entire codebase assumes that the
>     initialization happens when the process is started.
>
>     You'll need to run that job in a separate cluster.
>
>     Overall, this sounds like something that should run externally;
>     assert some precondition, then configure Flink appropriately, then
>     run the job.
>
>     On 08/03/2022 08:47, Gil De Grove wrote:
>>     Hello everyone,
>>
>>     First of all, sorry for cross posting, I asked on SO, but David
>>     Anderson suggested me to reach out to the community via the
>>     mailing list. The link to the SO question is the following:
>>     https://stackoverflow.com/questions/71381266/using-another-filesystem-configuration-while-creating-a-job
>>
>>     I'll post the answer on SO as soon as I have one :)
>>
>>     I post here the content of the question, so if anyone can help,
>>     please let me know;
>>
>>
>>           Summary
>>
>>     We are currently facing an issue with the FileSystem abstraction
>>     in Flink. We have a job that can dynamically connect to an S3
>>     source (meaning it's defined at runtime). We discovered a bug in
>>     our code, and it could be due to a wrong assumption on the way
>>     the FileSystem works.
>>
>>
>>           Bug explanation
>>
>>     During the initialization of the job, (so in the job manager) we
>>     manipulate the FS to check that some files exist in order to fail
>>     gracefully before the job is executed. In our case, we need to
>>     set dynamically the FS. It can be either HDFS, S3 on AWS or S3 on
>>     MinIO. We want the FS configuration to be specific for the job,
>>     and different from the cluster one (different access key,
>>     different endpoint, etc.).
>>
>>     Here is an extract of the code we are using to do so:
>>
>>     |private void validateFileSystemAccess(Configuration
>>     configuration) throws IOException { // Create a plugin manager
>>     from the configuration PluginManager pluginManager =
>>     PluginUtils.createPluginManagerFromRootFolder(configuration); //
>>     Init the FileSystem from the configuration
>>     FileSystem.initialize(configuration, pluginManager); // Validate
>>     the FileSystem: an exception is thrown if FS configuration is
>>     wrong Path archiverPath = new Path(this.archiverPath);
>>     archiverPath.getFileSystem().exists(new Path("/")); } |
>>
>>     After starting that specific kind of job, we notice that:
>>
>>      1. the checkpointing does not work for this job, it throws a
>>         credential error.
>>      2. the job manager cannot upload the artifacts needed by the
>>         history server for all jobs already running of all kind (not
>>         only this specific kind of job).
>>
>>     If we do not deploy that kind of job, the upload of artifacts and
>>     the checkpointing work as expected on the cluster.
>>
>>     We think that this issue might come from the
>>     |FileSystem.initialize()| that overrides the configuration for
>>     all the FileSystems. We think that because of this, the next call
>>     to |FileSystem.get()| returns the FileSystem we configured in
>>     |validateFileSystemAccess| instead of the cluster configured one.
>>
>>
>>           Questions
>>
>>     Could our hypothesis be correct? If so, how could we provide a
>>     specific configuration for the FileSystem without impacting the
>>     whole cluster?
>>
>>     Regards,
>>     Gil
>>
>

Re: Using another FileSystem configuration while creating a job

Posted by Gil De Grove <gi...@digazu.com>.
Hello Chesnay,

Thanks for the reply.

I wonder something based on your reply, if we want to use two sets of
credentials, for example to access two different AWS buckets, that would
not be feasible at the moment?
One example I have in mind would be to separate the credentials for
accessing data vs storing metadata for a given cluster.
Another use case would be to create a Flink job that consume data stored on
AWS S3 and/or on MinIO.
Would it mean that in order to achieve this, we would have to set up two
clusters and publishing to a temporary medium? For example, using two
clusters one configured to access CephFS, one for AWS S3 then publish that
to Kafka (or use Kafka Connect)?

We are requesting that, as we would like to use the Hybrid source with a
FileSystem and a Kafka consumer, and this limitation would probably make us
rethink the architecture.
As it seems that this limitation is quite an important one, is there a
place where we can find this documented? Maybe a FLIP? Or an entry in the
Flink Documentation?

Thanks again for your help,
Gil


On Thu, 10 Mar 2022 at 10:57, Chesnay Schepler <ch...@apache.org> wrote:

> The FileSystem class is essentially one big singleton, with only 1
> instance of each FileSystem implementation being loaded, shared across all
> jobs.
> For that reason we do not support job-specific FileSystem configurations.
> Note that we generally also don't really support configuring the
> FileSystems at runtime. The entire codebase assumes that the initialization
> happens when the process is started.
>
> You'll need to run that job in a separate cluster.
>
> Overall, this sounds like something that should run externally; assert
> some precondition, then configure Flink appropriately, then run the job.
>
> On 08/03/2022 08:47, Gil De Grove wrote:
>
> Hello everyone,
>
> First of all, sorry for cross posting, I asked on SO, but David Anderson
> suggested me to reach out to the community via the mailing list. The link
> to the SO question is the following:
> https://stackoverflow.com/questions/71381266/using-another-filesystem-configuration-while-creating-a-job
>
> I'll post the answer on SO as soon as I have one :)
>
> I post here the content of the question, so if anyone can help, please let
> me know;
>
> Summary
>
> We are currently facing an issue with the FileSystem abstraction in Flink.
> We have a job that can dynamically connect to an S3 source (meaning it's
> defined at runtime). We discovered a bug in our code, and it could be due
> to a wrong assumption on the way the FileSystem works.
> Bug explanation
>
> During the initialization of the job, (so in the job manager) we
> manipulate the FS to check that some files exist in order to fail
> gracefully before the job is executed. In our case, we need to set
> dynamically the FS. It can be either HDFS, S3 on AWS or S3 on MinIO. We
> want the FS configuration to be specific for the job, and different from
> the cluster one (different access key, different endpoint, etc.).
>
> Here is an extract of the code we are using to do so:
>
>   private void validateFileSystemAccess(Configuration configuration) throws IOException {
>     // Create a plugin manager from the configuration
>     PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);
>
>     // Init the FileSystem from the configuration
>     FileSystem.initialize(configuration, pluginManager);
>
>     // Validate the FileSystem: an exception is thrown if FS configuration is wrong
>     Path archiverPath = new Path(this.archiverPath);
>     archiverPath.getFileSystem().exists(new Path("/"));
>   }
>
> After starting that specific kind of job, we notice that:
>
>    1. the checkpointing does not work for this job, it throws a
>    credential error.
>    2. the job manager cannot upload the artifacts needed by the history
>    server for all jobs already running of all kind (not only this specific
>    kind of job).
>
> If we do not deploy that kind of job, the upload of artifacts and the
> checkpointing work as expected on the cluster.
>
> We think that this issue might come from the FileSystem.initialize() that
> overrides the configuration for all the FileSystems. We think that because
> of this, the next call to FileSystem.get() returns the FileSystem we
> configured in validateFileSystemAccess instead of the cluster configured
> one.
> Questions
>
> Could our hypothesis be correct? If so, how could we provide a specific
> configuration for the FileSystem without impacting the whole cluster?
> Regards,
> Gil
>
>
>

Re: Using another FileSystem configuration while creating a job

Posted by Chesnay Schepler <ch...@apache.org>.
The FileSystem class is essentially one big singleton, with only 1 
instance of each FileSystem implementation being loaded, shared across 
all jobs.
For that reason we do not support job-specific FileSystem configurations.
Note that we generally also don't really support configuring the 
FileSystems at runtime. The entire codebase assumes that the 
initialization happens when the process is started.

You'll need to run that job in a separate cluster.

Overall, this sounds like something that should run externally; assert 
some precondition, then configure Flink appropriately, then run the job.

On 08/03/2022 08:47, Gil De Grove wrote:
> Hello everyone,
>
> First of all, sorry for cross posting, I asked on SO, but David 
> Anderson suggested me to reach out to the community via the mailing 
> list. The link to the SO question is the following: 
> https://stackoverflow.com/questions/71381266/using-another-filesystem-configuration-while-creating-a-job
>
> I'll post the answer on SO as soon as I have one :)
>
> I post here the content of the question, so if anyone can help, please 
> let me know;
>
>
>       Summary
>
> We are currently facing an issue with the FileSystem abstraction in 
> Flink. We have a job that can dynamically connect to an S3 source 
> (meaning it's defined at runtime). We discovered a bug in our code, 
> and it could be due to a wrong assumption on the way the FileSystem works.
>
>
>       Bug explanation
>
> During the initialization of the job, (so in the job manager) we 
> manipulate the FS to check that some files exist in order to fail 
> gracefully before the job is executed. In our case, we need to set 
> dynamically the FS. It can be either HDFS, S3 on AWS or S3 on MinIO. 
> We want the FS configuration to be specific for the job, and different 
> from the cluster one (different access key, different endpoint, etc.).
>
> Here is an extract of the code we are using to do so:
>
> |private void validateFileSystemAccess(Configuration configuration) 
> throws IOException { // Create a plugin manager from the configuration 
> PluginManager pluginManager = 
> PluginUtils.createPluginManagerFromRootFolder(configuration); // Init 
> the FileSystem from the configuration 
> FileSystem.initialize(configuration, pluginManager); // Validate the 
> FileSystem: an exception is thrown if FS configuration is wrong Path 
> archiverPath = new Path(this.archiverPath); 
> archiverPath.getFileSystem().exists(new Path("/")); } |
>
> After starting that specific kind of job, we notice that:
>
>  1. the checkpointing does not work for this job, it throws a
>     credential error.
>  2. the job manager cannot upload the artifacts needed by the history
>     server for all jobs already running of all kind (not only this
>     specific kind of job).
>
> If we do not deploy that kind of job, the upload of artifacts and the 
> checkpointing work as expected on the cluster.
>
> We think that this issue might come from the |FileSystem.initialize()| 
> that overrides the configuration for all the FileSystems. We think 
> that because of this, the next call to |FileSystem.get()| returns the 
> FileSystem we configured in |validateFileSystemAccess| instead of the 
> cluster configured one.
>
>
>       Questions
>
> Could our hypothesis be correct? If so, how could we provide a 
> specific configuration for the FileSystem without impacting the whole 
> cluster?
>
> Regards,
> Gil
>