You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 周瑞 <ru...@woqutech.com> on 2021/08/10 11:44:45 UTC

Flink HIve 文件压缩报错

您好:Flink 写入Hive的时候,在压缩文件的时候有个待压缩的文件丢失了,导致Flink程序一直在不断重启,请问文件丢失是什么原因导致的,这种情况怎么能够让Flink程序正常启动
2021-08-10 19:34:19 java.io.UncheckedIOException: java.io.FileNotFoundException: File does not exist: hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3 	at org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:163) 	at org.apache.flink.table.runtime.util.BinPacking.pack(BinPacking.java:38) 	at org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:173) 	at java.util.HashMap.forEach(HashMap.java:1288) 	at org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:169) 	at org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:151) 	at org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:141) 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) 	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) 	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) 	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) 	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) 	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) 	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) 	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) 	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) 	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 	at java.lang.Thread.run(Thread.java:748) Caused by: java.io.FileNotFoundException: File does not exist: hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3 	at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1583) 	at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1576) 	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) 	at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1591) 	at org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85) 	at org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:161) 	... 19 more

Re: Flink HIve 文件压缩报错

Posted by yidan zhao <hi...@gmail.com>.
hi,这个问题我也遇到了,不过我是因为自己删掉了文件。但是呢,我想说我删除的逻辑是按照带有success文件为标记的分区,然后删除该分区中.开头文件的。
目的在于避免部分情况下,分区中存在无用的.开头文件,导致文件系统inode爆炸。所以用定时任务进行了清理,清理逻辑就是仅仅针对success文件的分区。
但是现在发现也遇到了这个问题,而且导致了任务重启。

Rui Li <li...@gmail.com> 于2021年8月11日周三 下午8:57写道:

> 目前没有办法让作业继续跑,只能重跑了
>
> 这里有几个不同的问题:
> 1. 文件是如何被删掉的,两种可能:被Flink误删(需要在Flink这边修复)、或者被Flink之外的进程删掉
> 2. 文件丢失意味着exactly once语义无法保证了,这种情况是否可以接受
> 3. 如果可以接受丢失数据,如何让Flink作业可以继续跑(需要Flink的改进)
>
> On Wed, Aug 11, 2021 at 7:52 PM 周瑞 <ru...@woqutech.com> wrote:
>
> > 您好:
> > &nbsp; 这个文件确实不存在了,这种情况目前怎样设置可以让作业继续跑
> > &nbsp;
> > &nbsp;
> > ------------------&nbsp;Original&nbsp;------------------
> > From: &nbsp;"Rui Li"<lirui.fudan@gmail.com&gt;;
> > Date: &nbsp;Wed, Aug 11, 2021 07:49 PM
> > To: &nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
> >
> > Subject: &nbsp;Re: Flink HIve 文件压缩报错
> >
> > &nbsp;
> >
> > 这个文件是确实不在了么?是不是被别的进程删掉了呢,可以通过hdfs的audit log来判断一下。
> >
> > 目前flink这边写文件的exactly
> > once语义是依赖HDFS的一致性保证的,如果之前写到HDFS的数据丢掉了就会破坏这个语义了(不过我们可以考虑在这种情况下让作业能继续跑)。
> >
> > On Tue, Aug 10, 2021 at 7:45 PM 周瑞 <rui.zhou@woqutech.com&gt; wrote:
> >
> > &gt; 您好:Flink
> > &gt;
> >
> 写入Hive的时候,在压缩文件的时候有个待压缩的文件丢失了,导致Flink程序一直在不断重启,请问文件丢失是什么原因导致的,这种情况怎么能够让Flink程序正常启动
> > &gt; 2021-08-10 19:34:19 java.io.UncheckedIOException:
> > &gt; java.io.FileNotFoundException: File does not exist:
> > &gt;
> >
> hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt;
> >
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:163)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt;
> > org.apache.flink.table.runtime.util.BinPacking.pack(BinPacking.java:38)
> > &gt;&nbsp;&nbsp; at
> > &gt;
> >
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:173)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> >
> java.util.HashMap.forEach(HashMap.java:1288)&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > at
> > &gt;
> >
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:169)
> > &gt; at
> > &gt;
> >
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:151)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt;
> >
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:141)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt;
> >
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
> > &gt; at org.apache.flink.streaming.runtime.io
> >
> .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.streaming.runtime.io
> >
> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.
> > runtime.io
> > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> > &gt; at
> > &gt;
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt;
> >
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> > &gt; at
> > &gt;
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt;
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt;
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt;
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
> > &gt;&nbsp; at
> >
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > at
> > &gt; java.lang.Thread.run(Thread.java:748) Caused by:
> > &gt; java.io.FileNotFoundException: File does not exist:
> > &gt;
> >
> hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3
> > &gt;&nbsp;&nbsp;&nbsp; at
> > &gt;
> >
> org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1583)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt;
> >
> org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1576)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt;
> >
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> > &gt; at
> > &gt;
> >
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1591)
> > &gt; at
> > &gt;
> >
> org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> > &gt;
> >
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:161)
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ... 19 more
> >
> >
> >
> > --
> > Best regards!
> > Rui Li
>
>
>
> --
> Best regards!
> Rui Li
>

Re: Flink HIve 文件压缩报错

Posted by Rui Li <li...@gmail.com>.
目前没有办法让作业继续跑,只能重跑了

这里有几个不同的问题:
1. 文件是如何被删掉的,两种可能:被Flink误删(需要在Flink这边修复)、或者被Flink之外的进程删掉
2. 文件丢失意味着exactly once语义无法保证了,这种情况是否可以接受
3. 如果可以接受丢失数据,如何让Flink作业可以继续跑(需要Flink的改进)

On Wed, Aug 11, 2021 at 7:52 PM 周瑞 <ru...@woqutech.com> wrote:

> 您好:
> &nbsp; 这个文件确实不存在了,这种情况目前怎样设置可以让作业继续跑
> &nbsp;
> &nbsp;
> ------------------&nbsp;Original&nbsp;------------------
> From: &nbsp;"Rui Li"<lirui.fudan@gmail.com&gt;;
> Date: &nbsp;Wed, Aug 11, 2021 07:49 PM
> To: &nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
> Subject: &nbsp;Re: Flink HIve 文件压缩报错
>
> &nbsp;
>
> 这个文件是确实不在了么?是不是被别的进程删掉了呢,可以通过hdfs的audit log来判断一下。
>
> 目前flink这边写文件的exactly
> once语义是依赖HDFS的一致性保证的,如果之前写到HDFS的数据丢掉了就会破坏这个语义了(不过我们可以考虑在这种情况下让作业能继续跑)。
>
> On Tue, Aug 10, 2021 at 7:45 PM 周瑞 <rui.zhou@woqutech.com&gt; wrote:
>
> &gt; 您好:Flink
> &gt;
> 写入Hive的时候,在压缩文件的时候有个待压缩的文件丢失了,导致Flink程序一直在不断重启,请问文件丢失是什么原因导致的,这种情况怎么能够让Flink程序正常启动
> &gt; 2021-08-10 19:34:19 java.io.UncheckedIOException:
> &gt; java.io.FileNotFoundException: File does not exist:
> &gt;
> hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt;
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:163)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt;
> org.apache.flink.table.runtime.util.BinPacking.pack(BinPacking.java:38)
> &gt;&nbsp;&nbsp; at
> &gt;
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:173)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> java.util.HashMap.forEach(HashMap.java:1288)&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> at
> &gt;
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:169)
> &gt; at
> &gt;
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:151)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt;
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:141)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt;
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
> &gt; at org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.
> runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> &gt; at
> &gt;
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt;
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> &gt; at
> &gt;
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt;
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt;
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt;
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
> &gt;&nbsp; at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> at
> &gt; java.lang.Thread.run(Thread.java:748) Caused by:
> &gt; java.io.FileNotFoundException: File does not exist:
> &gt;
> hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3
> &gt;&nbsp;&nbsp;&nbsp; at
> &gt;
> org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1583)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt;
> org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1576)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt;
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> &gt; at
> &gt;
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1591)
> &gt; at
> &gt;
> org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt;
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:161)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ... 19 more
>
>
>
> --
> Best regards!
> Rui Li



-- 
Best regards!
Rui Li

Re: Flink HIve 文件压缩报错

Posted by 周瑞 <ru...@woqutech.com>.
您好:
&nbsp; 这个文件确实不存在了,这种情况目前怎样设置可以让作业继续跑
&nbsp;
&nbsp;
------------------&nbsp;Original&nbsp;------------------
From: &nbsp;"Rui Li"<lirui.fudan@gmail.com&gt;;
Date: &nbsp;Wed, Aug 11, 2021 07:49 PM
To: &nbsp;"user-zh"<user-zh@flink.apache.org&gt;; 

Subject: &nbsp;Re: Flink HIve 文件压缩报错

&nbsp;

这个文件是确实不在了么?是不是被别的进程删掉了呢,可以通过hdfs的audit log来判断一下。

目前flink这边写文件的exactly
once语义是依赖HDFS的一致性保证的,如果之前写到HDFS的数据丢掉了就会破坏这个语义了(不过我们可以考虑在这种情况下让作业能继续跑)。

On Tue, Aug 10, 2021 at 7:45 PM 周瑞 <rui.zhou@woqutech.com&gt; wrote:

&gt; 您好:Flink
&gt; 写入Hive的时候,在压缩文件的时候有个待压缩的文件丢失了,导致Flink程序一直在不断重启,请问文件丢失是什么原因导致的,这种情况怎么能够让Flink程序正常启动
&gt; 2021-08-10 19:34:19 java.io.UncheckedIOException:
&gt; java.io.FileNotFoundException: File does not exist:
&gt; hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:163)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.table.runtime.util.BinPacking.pack(BinPacking.java:38)
&gt;&nbsp;&nbsp; at
&gt; org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:173)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at java.util.HashMap.forEach(HashMap.java:1288)&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:169)
&gt; at
&gt; org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:151)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:141)
&gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
&gt; at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
&gt; at
&gt; org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
&gt; at
&gt; org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
&gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
&gt;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
&gt;&nbsp; at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; java.lang.Thread.run(Thread.java:748) Caused by:
&gt; java.io.FileNotFoundException: File does not exist:
&gt; hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3
&gt;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1583)
&gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1576)
&gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
&gt; at
&gt; org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1591)
&gt; at
&gt; org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:161)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ... 19 more



-- 
Best regards!
Rui Li

Re: Flink HIve 文件压缩报错

Posted by Rui Li <li...@gmail.com>.
这个文件是确实不在了么?是不是被别的进程删掉了呢,可以通过hdfs的audit log来判断一下。

目前flink这边写文件的exactly
once语义是依赖HDFS的一致性保证的,如果之前写到HDFS的数据丢掉了就会破坏这个语义了(不过我们可以考虑在这种情况下让作业能继续跑)。

On Tue, Aug 10, 2021 at 7:45 PM 周瑞 <ru...@woqutech.com> wrote:

> 您好:Flink
> 写入Hive的时候,在压缩文件的时候有个待压缩的文件丢失了,导致Flink程序一直在不断重启,请问文件丢失是什么原因导致的,这种情况怎么能够让Flink程序正常启动
> 2021-08-10 19:34:19 java.io.UncheckedIOException:
> java.io.FileNotFoundException: File does not exist:
> hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3
>      at
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:163)
>        at
> org.apache.flink.table.runtime.util.BinPacking.pack(BinPacking.java:38)
>   at
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:173)
>        at java.util.HashMap.forEach(HashMap.java:1288)         at
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:169)
> at
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:151)
>       at
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:141)
>     at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
> at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>        at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>      at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>       at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
>      at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>       at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)         at
> java.lang.Thread.run(Thread.java:748) Caused by:
> java.io.FileNotFoundException: File does not exist:
> hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3
>    at
> org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1583)
>     at
> org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1576)
>     at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1591)
> at
> org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85)
>       at
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:161)
>        ... 19 more



-- 
Best regards!
Rui Li