You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "hanjie (Jira)" <ji...@apache.org> on 2022/01/13 07:27:00 UTC

[jira] [Commented] (FLINK-25057) Streaming File Sink writing to HDFS

    [ https://issues.apache.org/jira/browse/FLINK-25057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17475147#comment-17475147 ] 

hanjie commented on FLINK-25057:
--------------------------------

hi,[~zhisheng], we can set  file PartPrefix. {color:#0052cc}for {color}_example:  part-uuid-_

{_}when  we restart work, t{_}he file name will not be duplicated.

 

 

> Streaming File Sink writing to  HDFS
> ------------------------------------
>
>                 Key: FLINK-25057
>                 URL: https://issues.apache.org/jira/browse/FLINK-25057
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / FileSystem
>    Affects Versions: 1.12.1
>            Reporter: hanjie
>            Priority: Major
>
> Env: Flink 1.12.1
> kafka --> hdfs 
> hdfs : Streaming File Sink
> When I first start flink task:
>     *First part file example:*
>        part-0-0
>        part-0-1
>       .part-0-2.inprogress.952eb958-dac9-4f2c-b92f-9084ed536a1c
>   I cancel flink task. then, i restart task without savepoint or checkpoint. Task run for a while.
>    *Second part file example:*
>           part-0-0
>           part-0-1
>           .part-0-2.inprogress.952eb958-dac9-4f2c-b92f-9084ed536a1c
>           .part-0-0.inprogress.0e2f234b-042d-4232-a5f7-c980f04ca82d
>     'part-0-2.inprogress.952eb958-dac9-4f2c-b92f-9084ed536a1c' not rename file and bucketIndex will start zero.
>      I view related code. Start task need savepoint or checkpoint. I choose savepoint.The above question disappears, when i start third test. 
>     But, if i use expire savepoint. Task will  throw exception.
>      java.io.FileNotFoundException: File does not exist: /ns-hotel/hotel_sa_log/stream/sa_cpc_ad_log_list_detail_dwd/2021-11-25/.part-6-1537.inprogress.cd9c756a-1756-4dc5-9325-485fe99a2803\n\tat org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)\n\tat org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)\n\tat org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)\n\tat org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)\n\tat org.apache.hadoop.fs.FileSystem.resolvePath(FileSystem.java:752)\n\tat org.apache.hadoop.fs.FilterFileSystem.resolvePath(FilterFileSystem.java:153)\n\tat org.apache.hadoop.fs.viewfs.ChRootedFileSystem.resolvePath(ChRootedFileSystem.java:373)\n\tat org.apache.hadoop.fs.viewfs.ViewFileSystem.resolvePath(ViewFileSystem.java:243)\n\tat org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.revokeLeaseByFileSystem(HadoopRecoverableFsDataOutputStream.java:327)\n\tat org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:163)\n\tat org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.<init>(HadoopRecoverableFsDataOutputStream.java:88)\n\tat org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:86)\n\tat org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.resumeInProgressFileFrom(OutputStreamBasedPartFileWriter.java:104)\n\tat org.apache.flink.streaming.api.functions.sink.filesyst
>   Task set 'execution.checkpointing.interval': 1min,  I  invoke savepoint  every fifth minutes.
>    Consult next everybody solution.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)