You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Puneet Duggal <pu...@gmail.com> on 2022/01/10 07:18:35 UTC

Uploading jar to s3 for persistence

Hi, 

Currently i am working with flink HA cluster with 3 job managers and 3 zookeeper nodes. Also i am persisting my checkpoints to s3 and hence already configured required flink-s3 jars during flink job manager and task manager process startup. Now i have configured a variable 

web.upload.dir: s3p://d11-flink-job-manager-load/jars <s3p://d11-flink-job-manager-load/jars>

Expectation is that jar upload via rest apis will be uploaded to this location and hence is accessible to all 3 job managers (which eventually will help in job submission as all 3 job managers will have record of uploaded jar to this location). But while uploading the jar, I am facing following Illegal Argument Exception which i am not sure why. Also above provided s3 location was created before job manager process was even started.

2022-01-09 18:12:46,790 WARN  org.apache.flink.runtime.rest.FileUploadHandler              [] - File upload failed.
java.lang.IllegalArgumentException: UploadDirectory is not absolute.
	at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.rest.handler.FileUploads.<init>(FileUploads.java:59) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:186) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
	at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[flink-dist_2.12-1.13.1.jar:1.13.1]




Re: Uploading jar to s3 for persistence

Posted by David Morávek <dm...@apache.org>.
I understand the issue.

We currently don't have a good mechanism for this kind of external file
management (we need to avoid leaking resources) :( Even right now, we kind
of rely on upload directory being cleaned up by the cluster manager (yarn,
k8s), because it's tied with a container lifecycle.

I could imagine something along the lines of storing this into blob server,
but this would require additional metadata that we'd need to store into HA
services which is something we want to avoid in general and it would add
additional complexity.

Chesnay's proposal seems reasonable as this would make the REST API behave
more consistently with the CLI.

D.

On Mon, Jan 10, 2022 at 5:34 PM Puneet Duggal <pu...@gmail.com>
wrote:

> Hi,
>
> Ignore above reply. Got your point. Just one doubt. So is using java.nio.file.FileSystem
> an expectation instead of Flink’s org.apache.flink.core.fs.FileSystem. I
> mean can we raise it as an issue to use flink filesystem instead as it
> allows us to use distributed filesystem as persistent storage of jars and
> hence can be accessible to all the job managers in HA setup.
>
> Currently since web.upload.dir can only assume value of local filesystem
> dir, there are cases where  job submission request is raised to other Job
> manager and hence leads to jar not found issue.
> Another use case that it will solve is that when job manager goes
> down/restarted, currently it deletes the dir containing all the jars. Use
> persistent storage will also help in this case.
>
> Best,
> Puneet Duggal
>
> On 10-Jan-2022, at 9:40 PM, Puneet Duggal <pu...@gmail.com>
> wrote:
>
> Hi Piotr,
>
> Thank you for your immediate reply. I went through this thread and it was
> also mentioned that flink required s3-filesystem related jars which are
> present in my HA flink cluster. Also as mentioned in Apache Flink
> Documentation for Amazon S3 integration ,
>
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/
>
> It is mentioned that we can use s3 in all locations where Flink expects
> Filesystem URI. (including high availability and RocksDB State Backend).
>
> Regards,
> Puneet Duggal
>
> On 10-Jan-2022, at 7:13 PM, Piotr Nowojski <pn...@apache.org> wrote:
>
> Hi Puneet,
>
> Have you seen this thread before? [1]. It looks like the same issue and
> especially this part might be the key:
>
> > Be aware that the filesystem used by the FileUploadHandler
> > is java.nio.file.FileSystem and not
> > Flink's org.apache.flink.core.fs.FileSystem for which we provide
> different
> > FileSystem implementations.
>
> Best,
> Piotrek
>
> [1] https://www.mail-archive.com/user@flink.apache.org/msg38043.html
>
>
>
> pon., 10 sty 2022 o 08:19 Puneet Duggal <pu...@gmail.com>
> napisał(a):
>
>> Hi,
>>
>> Currently i am working with flink HA cluster with 3 job managers and 3
>> zookeeper nodes. Also i am persisting my checkpoints to s3 and hence
>> already configured required flink-s3 jars during flink job manager and task
>> manager process startup. Now i have configured a variable
>>
>> web.upload.dir: s3p://d11-flink-job-manager-load/jars
>>
>> Expectation is that jar upload via rest apis will be uploaded to this
>> location and hence is accessible to all 3 job managers (which eventually
>> will help in job submission as all 3 job managers will have record of
>> uploaded jar to this location). But while uploading the jar, I am facing
>> following Illegal Argument Exception which i am not sure why. Also above
>> provided s3 location was created before job manager process was even
>> started.
>>
>> *2022-01-09 18:12:46,790 WARN
>>  org.apache.flink.runtime.rest.FileUploadHandler              [] - File
>> upload failed.*
>> *java.lang.IllegalArgumentException: UploadDirectory is not absolute.*
>> at
>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.runtime.rest.handler.FileUploads.<init>(FileUploads.java:59)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:186)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>>
>>
>>
>>
>
>

Re: Uploading jar to s3 for persistence

Posted by Puneet Duggal <pu...@gmail.com>.
Hi,

Ignore above reply. Got your point. Just one doubt. So is using java.nio.file.FileSystem an expectation instead of Flink’s org.apache.flink.core.fs.FileSystem. I mean can we raise it as an issue to use flink filesystem instead as it allows us to use distributed filesystem as persistent storage of jars and hence can be accessible to all the job managers in HA setup.

Currently since web.upload.dir can only assume value of local filesystem dir, there are cases where  job submission request is raised to other Job manager and hence leads to jar not found issue. 
Another use case that it will solve is that when job manager goes down/restarted, currently it deletes the dir containing all the jars. Use persistent storage will also help in this case.

Best,
Puneet Duggal

> On 10-Jan-2022, at 9:40 PM, Puneet Duggal <pu...@gmail.com> wrote:
> 
> Hi Piotr,
> 
> Thank you for your immediate reply. I went through this thread and it was also mentioned that flink required s3-filesystem related jars which are present in my HA flink cluster. Also as mentioned in Apache Flink Documentation for Amazon S3 integration ,
> 
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/ <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/>
> 
> It is mentioned that we can use s3 in all locations where Flink expects Filesystem URI. (including high availability and RocksDB State Backend).
> 
> Regards, 
> Puneet Duggal
> 
>> On 10-Jan-2022, at 7:13 PM, Piotr Nowojski <pnowojski@apache.org <ma...@apache.org>> wrote:
>> 
>> Hi Puneet,
>> 
>> Have you seen this thread before? [1]. It looks like the same issue and especially this part might be the key:
>> 
>> > Be aware that the filesystem used by the FileUploadHandler
>> > is java.nio.file.FileSystem and not
>> > Flink's org.apache.flink.core.fs.FileSystem for which we provide different
>> > FileSystem implementations.
>> 
>> Best,
>> Piotrek
>> 
>> [1] https://www.mail-archive.com/user@flink.apache.org/msg38043.html <https://www.mail-archive.com/user@flink.apache.org/msg38043.html>
>> 
>> 
>> 
>> pon., 10 sty 2022 o 08:19 Puneet Duggal <puneetduggal1795@gmail.com <ma...@gmail.com>> napisał(a):
>> Hi, 
>> 
>> Currently i am working with flink HA cluster with 3 job managers and 3 zookeeper nodes. Also i am persisting my checkpoints to s3 and hence already configured required flink-s3 jars during flink job manager and task manager process startup. Now i have configured a variable 
>> 
>> web.upload.dir: s3p://d11-flink-job-manager-load/jars <>
>> 
>> Expectation is that jar upload via rest apis will be uploaded to this location and hence is accessible to all 3 job managers (which eventually will help in job submission as all 3 job managers will have record of uploaded jar to this location). But while uploading the jar, I am facing following Illegal Argument Exception which i am not sure why. Also above provided s3 location was created before job manager process was even started.
>> 
>> 2022-01-09 18:12:46,790 WARN  org.apache.flink.runtime.rest.FileUploadHandler              [] - File upload failed.
>> java.lang.IllegalArgumentException: UploadDirectory is not absolute.
>> 	at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> 	at org.apache.flink.runtime.rest.handler.FileUploads.<init>(FileUploads.java:59) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> 	at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:186) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> 	at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> 	at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> 	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> 	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> 	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> 	at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> 
>> 
>> 
> 


Re: Uploading jar to s3 for persistence

Posted by David Morávek <dm...@apache.org>.
Hi Puneet,

this is a known limitation and unfortunately `web.upload.dir` currently
works only with the local system :( There are multiple issues covering this
already, I guess FLINK-16544 [1] summarizes the current state well.

This is something we want to address with the future releases. We've
briefly discussed it with Chesnay and the most straightforward / solid
approach would be making the "rest submission" stateless (having a single
endpoint that takes the jar and runs it right away).

We'll also try to make the documentation more explicit in terms of which
"file-based" config options only work with the local filesystem for the
1.15 release.

[1] https://issues.apache.org/jira/browse/FLINK-16544

Best,
D.

On Mon, Jan 10, 2022 at 5:11 PM Puneet Duggal <pu...@gmail.com>
wrote:

> Hi Piotr,
>
> Thank you for your immediate reply. I went through this thread and it was
> also mentioned that flink required s3-filesystem related jars which are
> present in my HA flink cluster. Also as mentioned in Apache Flink
> Documentation for Amazon S3 integration ,
>
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/
>
> It is mentioned that we can use s3 in all locations where Flink expects
> Filesystem URI. (including high availability and RocksDB State Backend).
>
> Regards,
> Puneet Duggal
>
> On 10-Jan-2022, at 7:13 PM, Piotr Nowojski <pn...@apache.org> wrote:
>
> Hi Puneet,
>
> Have you seen this thread before? [1]. It looks like the same issue and
> especially this part might be the key:
>
> > Be aware that the filesystem used by the FileUploadHandler
> > is java.nio.file.FileSystem and not
> > Flink's org.apache.flink.core.fs.FileSystem for which we provide
> different
> > FileSystem implementations.
>
> Best,
> Piotrek
>
> [1] https://www.mail-archive.com/user@flink.apache.org/msg38043.html
>
>
>
> pon., 10 sty 2022 o 08:19 Puneet Duggal <pu...@gmail.com>
> napisał(a):
>
>> Hi,
>>
>> Currently i am working with flink HA cluster with 3 job managers and 3
>> zookeeper nodes. Also i am persisting my checkpoints to s3 and hence
>> already configured required flink-s3 jars during flink job manager and task
>> manager process startup. Now i have configured a variable
>>
>> web.upload.dir: s3p://d11-flink-job-manager-load/jars
>>
>> Expectation is that jar upload via rest apis will be uploaded to this
>> location and hence is accessible to all 3 job managers (which eventually
>> will help in job submission as all 3 job managers will have record of
>> uploaded jar to this location). But while uploading the jar, I am facing
>> following Illegal Argument Exception which i am not sure why. Also above
>> provided s3 location was created before job manager process was even
>> started.
>>
>> *2022-01-09 18:12:46,790 WARN
>>  org.apache.flink.runtime.rest.FileUploadHandler              [] - File
>> upload failed.*
>> *java.lang.IllegalArgumentException: UploadDirectory is not absolute.*
>> at
>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.runtime.rest.handler.FileUploads.<init>(FileUploads.java:59)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:186)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>>
>>
>>
>>
>

Re: Uploading jar to s3 for persistence

Posted by Puneet Duggal <pu...@gmail.com>.
Hi Piotr,

Thank you for your immediate reply. I went through this thread and it was also mentioned that flink required s3-filesystem related jars which are present in my HA flink cluster. Also as mentioned in Apache Flink Documentation for Amazon S3 integration ,

https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/

It is mentioned that we can use s3 in all locations where Flink expects Filesystem URI. (including high availability and RocksDB State Backend).

Regards, 
Puneet Duggal

> On 10-Jan-2022, at 7:13 PM, Piotr Nowojski <pn...@apache.org> wrote:
> 
> Hi Puneet,
> 
> Have you seen this thread before? [1]. It looks like the same issue and especially this part might be the key:
> 
> > Be aware that the filesystem used by the FileUploadHandler
> > is java.nio.file.FileSystem and not
> > Flink's org.apache.flink.core.fs.FileSystem for which we provide different
> > FileSystem implementations.
> 
> Best,
> Piotrek
> 
> [1] https://www.mail-archive.com/user@flink.apache.org/msg38043.html <https://www.mail-archive.com/user@flink.apache.org/msg38043.html>
> 
> 
> 
> pon., 10 sty 2022 o 08:19 Puneet Duggal <puneetduggal1795@gmail.com <ma...@gmail.com>> napisał(a):
> Hi, 
> 
> Currently i am working with flink HA cluster with 3 job managers and 3 zookeeper nodes. Also i am persisting my checkpoints to s3 and hence already configured required flink-s3 jars during flink job manager and task manager process startup. Now i have configured a variable 
> 
> web.upload.dir: s3p://d11-flink-job-manager-load/jars <>
> 
> Expectation is that jar upload via rest apis will be uploaded to this location and hence is accessible to all 3 job managers (which eventually will help in job submission as all 3 job managers will have record of uploaded jar to this location). But while uploading the jar, I am facing following Illegal Argument Exception which i am not sure why. Also above provided s3 location was created before job manager process was even started.
> 
> 2022-01-09 18:12:46,790 WARN  org.apache.flink.runtime.rest.FileUploadHandler              [] - File upload failed.
> java.lang.IllegalArgumentException: UploadDirectory is not absolute.
> 	at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> 	at org.apache.flink.runtime.rest.handler.FileUploads.<init>(FileUploads.java:59) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> 	at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:186) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> 	at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> 	at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> 	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> 	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> 	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> 	at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> 
> 
> 


Re: Uploading jar to s3 for persistence

Posted by Piotr Nowojski <pn...@apache.org>.
Hi Puneet,

Have you seen this thread before? [1]. It looks like the same issue and
especially this part might be the key:

> Be aware that the filesystem used by the FileUploadHandler
> is java.nio.file.FileSystem and not
> Flink's org.apache.flink.core.fs.FileSystem for which we provide different
> FileSystem implementations.

Best,
Piotrek

[1] https://www.mail-archive.com/user@flink.apache.org/msg38043.html



pon., 10 sty 2022 o 08:19 Puneet Duggal <pu...@gmail.com>
napisał(a):

> Hi,
>
> Currently i am working with flink HA cluster with 3 job managers and 3
> zookeeper nodes. Also i am persisting my checkpoints to s3 and hence
> already configured required flink-s3 jars during flink job manager and task
> manager process startup. Now i have configured a variable
>
> web.upload.dir: s3p://d11-flink-job-manager-load/jars
>
> Expectation is that jar upload via rest apis will be uploaded to this
> location and hence is accessible to all 3 job managers (which eventually
> will help in job submission as all 3 job managers will have record of
> uploaded jar to this location). But while uploading the jar, I am facing
> following Illegal Argument Exception which i am not sure why. Also above
> provided s3 location was created before job manager process was even
> started.
>
> *2022-01-09 18:12:46,790 WARN
>  org.apache.flink.runtime.rest.FileUploadHandler              [] - File
> upload failed.*
> *java.lang.IllegalArgumentException: UploadDirectory is not absolute.*
> at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rest.handler.FileUploads.<init>(FileUploads.java:59)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:186)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>
>
>
>